Spark的播送变量和累加器运用要领代码示例
一、播送变量和累加器
平常状况下,当向Spark操纵(如map,reduce)通报一个函数时,它会在一个长途集群节点上施行,它会运用函数中所有变量的副本。这些变量被复制到所有的机器上,长途机器上并没有被更新的变量会向驱动程序回传。在任务之间运用通用的,支撑读写的同享变量是低效的。只管如此,Spark供给了两种有限类型的同享变量,播送变量和累加器。
1.1 播送变量:
播送变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间通报变量。播送变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试运用高效地播送算法来分发变量,进而减少通讯的开销。
Spark的行动通过一系列的步骤施行,这些步骤由散布式的shuffle操纵分开。Spark主动地播送每个步骤每个任务需要的通用数据。这些播送数据被序列化地缓存,在运转任务以前被反序列化出来。这意味着当我们需要在多个阶段的任务之间运用雷同的数据,或者以反序列化情势缓存数据是十分重要的时候,显式地新建播送变量才有用。
通过在一个变量v上调用SparkContext.broadcast(v)可以新建播送变量。播送变量是环绕着v的封装,可以通过value要领访问这个变量。举例如下:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
在新建了播送变量之后,在集群上的所有函数中应当运用它来替换运用v.这样v就不会不止一次地在节点之间传输了。另外,为了确保所有的节点获得雷同的变量,对象v在被播送之后就不应当再修改。
1.2 累加器:
累加器是仅仅被相干操纵累加的变量,因而可以在并行中被有效地支撑。它可以被用来实现计数器和总和。Spark原生地只支撑数字类型的累加器,编程者可以增加新类型的支撑。要是新建累加器时指定了名字,可以在Spark的UI界面看到。这有益于了解每个施行阶段的进程。(关于python还不支撑)
累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来新建。在集群上运转的任务可以通过add或者”+=”要领在累加器上进行累加操纵。但是,它们不能读取它的值。只要驱动程序能够读取它的值,通过累加器的value要领。
下面的代码展现了怎样把一个数组中的所有元素累加到累加器上:
scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10
只管上面的例子运用了内置支撑的累加器类型Int,但是开发人员也可以通过继承AccumulatorParam类来新建它们本人的累加器类型。AccumulatorParam接口有两个要领:
zero要领为你的类型供给一个0值。
addInPlace要领将两个值相加。
假如我们有一个代表数学vector的Vector类。我们可以向下面这样实现:
object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
在Scala里,Spark供给更通用的累加接口来累加数据,只管效果的类型和累加的数据类型可能不一致(例如,通过收集在一起的元从来新建一个列表)。同时,SparkContext..accumulableCollection要领来累加通用的Scala的汇合类型。
累加器仅仅在行动操纵内部被更新,Spark保证每个任务在累加器上的更新操纵只被施行一次,也就是说,重新启动任务也不会更新。在转换操纵中,会员必须意识到每个任务对累加器的更新操纵可能被不只一次施行,要是从新施行了任务和作业的阶段。
累加器并没有转变Spark的惰性求值模型。要是它们被RDD上的操纵更新,它们的值只要当RDD由于行动操纵被盘算时才被更新。因而,当施行一个惰性的转换操纵,比方map时,不能保证对累加器值的更新被现实施行了。下面的代码片段演示了此特性:
val accum = sc.accumulator(0) data.map { x => accum += x; f(x) } //在这里,accum的值依然是0,由于没有行动操纵引起map被现实的盘算.
二.Java和Scala版本的实战演示
2.1 Java版本:
/** * 实例:应用播送进行黑名单过滤! * 检查新的数据 依据是否在播送变量-黑名单内,从而实现过滤数据。 */ public class BroadcastAccumulator { /** * 新建一个List的播送变量 * */ private static volatile Broadcast<List<String>> broadcastList = null; /** * 计数器! */ private static volatile Accumulator<Integer> accumulator = null; public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]"). setAppName("WordCountOnlineBroadcast"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); /** * 注意:分发播送需要一个action操纵触发。 * 注意:播送的是Arrays的asList 而非对象的援用。播送Array数组的对象援用会出错。 * 运用broadcast播送黑名单到每个Executor中! */ broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive")); /** * 累加器作为全局计数器!用于统计在线过滤了多少个黑名单! * 在这里实例化。 */ accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter"); JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999); /** * 这里省去flatmap由于名单是一个个的! */ JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) { return v1 + v2; } }); /** * Funtion里面 前几个参数是 入参。 * 背面的出参。 * 表现在call要领里面! * */ wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() { @Override public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception { rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() { @Override public Boolean call(Tuple2<String, Integer> wordPair) throws Exception { if (broadcastList.value().contains(wordPair._1)) { /** * accumulator不仅仅用来计数。 * 可以同时写进数据库或者缓存中。 */ accumulator.add(wordPair._2); return false; }else { return true; } }; /** * 播送和计数器的施行,需要进行一个action操纵! */ }).collect(); System.out.println("播送器里面的值"+broadcastList.value()); System.out.println("计时器里面的值"+accumulator.value()); return null; } }); jsc.start(); jsc.awaitTermination(); jsc.close(); } }
2.2 Scala版本
package com.Streaming import java.util import org.apache.spark.streaming.{Duration, StreamingContext} import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf} import org.apache.spark.broadcast.Broadcast /** * Created by lxh on 2016/6/30. */ object BroadcastAccumulatorStreaming { /** * 声明一个播送和累加器! */ private var broadcastList:Broadcast[List[String]] = _ private var accumulator:Accumulator[Int] = _ def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest") val sc = new SparkContext(sparkConf) /** * duration是ms */ val ssc = new StreamingContext(sc,Duration(2000)) // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark")) broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark")) accumulator= ssc.sparkContext.accumulator(0,"broadcasttest") /** * 获取数据! */ val lines = ssc.socketTextStream("localhost",9999) /** * 1.flatmap把行分割成词。 * 2.map把词变成tuple(word,1) * 3.reducebykey累加value * (4.sortBykey排名) * 4.进行过滤。 value是否在累加器中。 * 5.打印显示。 */ val words = lines.flatMap(line => line.split(" ")) val wordpair = words.map(word => (word,1)) wordpair.filter(record => {broadcastList.value.contains(record._1)}) val pair = wordpair.reduceByKey(_+_) /** * 这个pair 是PairDStream<String, Integer> * 查看这个id是否在黑名单中,要是是的话,累加器就+1 */ /* pair.foreachRDD(rdd => { rdd.filter(record => { if (broadcastList.value.contains(record._1)) { accumulator.add(1) return true } else { return false } }) })*/ val filtedpair = pair.filter(record => { if (broadcastList.value.contains(record._1)) { accumulator.add(record._2) true } else { false } }).print println("累加器的值"+accumulator.value) // pair.filter(record => {broadcastList.value.contains(record._1)}) /* val keypair = pair.map(pair => (pair._2,pair._1))*/ /** * 要是DStream本人没有某个算子操纵。就通过转化transform! */ /* keypair.transform(rdd => { rdd.sortByKey(false)//TODO })*/ pair.print() ssc.start() ssc.awaitTermination() } }
总结
以上就是本文对于Spark的播送变量和累加器运用要领代码示例的全部内容,但愿对大家有所帮忙。感乐趣的朋友可以参阅:详解Java编写并运转spark利用程序的要领 、 Spark入门简介等,有什么题目可以随时留言,小编会及时回复大家。谢谢朋友们对我们网站的支撑。
热门标签:dede模板 / destoon模板 / dedecms模版 / 织梦模板