Spark自定义累加器的运用实例详解
累加器(accumulator)是Spark中供给的一种散布式的变量机制,其道理相似于mapreduce,即散布式的转变,然后聚合这些转变。累加器的一个常见用途是在调试时对作业施行历程中的事件进行计数。
累加器简略运用
Spark内置的供给了Long和Double类型的累加器。下面是一个简略的运用示例,在这个例子中我们在过滤掉RDD中奇数的同时进行计数,最后盘算剩下整数的和。
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]") val sc = new SparkContext(sparkConf) val accum = sc.longAccumulator("longAccum") //统计奇数的个数 val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{ if(n%2!=0) accum.add(1L) n%2==0 }).reduce(_+_) println("sum: "+sum) println("accum: "+accum.value) sc.stop()
效果为:
sum: 20
accum: 5
这是效果正常的状况,但是在运用累加器的历程中要是关于spark的施行历程了解的不足深入就会碰到两类典型的差错:少加(或者没加)、多加。
自定义累加器
自定义累加器类型的功能在1.X版本中就已经供给了,但是运用起来比较费事,在2.0版本后,累加器的易用性有了较大的改善,而且官方还供给了一个新的抽象类:AccumulatorV2来供给更加友爱的自定义类型累加器的实现方式。官方同时给出了一个实现的示例:CollectionAccumulator类,这个类允许以汇合的情势收集spark利用施行历程中的一些信息。例如,我们可以用这个类收集Spark处置数据时的一些细节,当然,因为累加器的值终究要会聚到driver端,为了以免 driver端的outofmemory题目,需要对收集的信息的规模要加以控制,不宜过大。
继承AccumulatorV2类,并复写它的所有要领
package spark import constant.Constant import org.apache.spark.util.AccumulatorV2 import util.getFieldFromConcatString import util.setFieldFromConcatString open class SessionAccmulator : AccumulatorV2<String, String>() { private var result = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" override fun value(): String { return this.result } /** * 合并数据 */ override fun merge(other: AccumulatorV2<String, String>?) { if (other == null) return else { if (other is SessionAccmulator) { var newResult = "" val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s, Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m, Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m, Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9, Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60) resultArray.forEach { val oldValue = other.result.getFieldFromConcatString("|", it) if (oldValue.isNotEmpty()) { val newValue = oldValue.toInt() + 1 //找到缘由,不断在轮回赋予值,debug30分钟 很烦 if (newResult.isEmpty()){ newResult = result.setFieldFromConcatString("|", it, newValue.toString()) } //题目就在于这里,自定义没有写错,合并错了 newResult = newResult.setFieldFromConcatString("|", it, newValue.toString()) } } result = newResult } } } override fun copy(): AccumulatorV2<String, String> { val sessionAccmulator = SessionAccmulator() sessionAccmulator.result = this.result return sessionAccmulator } override fun add(p0: String?) { val v1 = this.result val v2 = p0 if (v2.isNullOrEmpty()){ return }else{ var newResult = "" val oldValue = v1.getFieldFromConcatString("|", v2!!) if (oldValue.isNotEmpty()){ val newValue = oldValue.toInt() + 1 newResult = result.setFieldFromConcatString("|", v2, newValue.toString()) } result = newResult } } override fun reset() { val newResult = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" result = newResult } override fun isZero(): Boolean { val newResult = Constant.SESSION_COUNT + "=0|"+ Constant.TIME_PERIOD_1s_3s + "=0|"+ Constant.TIME_PERIOD_4s_6s + "=0|"+ Constant.TIME_PERIOD_7s_9s + "=0|"+ Constant.TIME_PERIOD_10s_30s + "=0|"+ Constant.TIME_PERIOD_30s_60s + "=0|"+ Constant.TIME_PERIOD_1m_3m + "=0|"+ Constant.TIME_PERIOD_3m_10m + "=0|"+ Constant.TIME_PERIOD_10m_30m + "=0|"+ Constant.TIME_PERIOD_30m + "=0|"+ Constant.STEP_PERIOD_1_3 + "=0|"+ Constant.STEP_PERIOD_4_6 + "=0|"+ Constant.STEP_PERIOD_7_9 + "=0|"+ Constant.STEP_PERIOD_10_30 + "=0|"+ Constant.STEP_PERIOD_30_60 + "=0|"+ Constant.STEP_PERIOD_60 + "=0" return this.result == newResult } }
要领介绍
value要领:获取累加器中的值
merge要领:该要领特殊重要,一定要写对,这个要领是各个task的累加器进行合并的要领(下面介绍施行流程中将要用到)
iszero要领:判断是否为初始值
reset要领:重置累加器中的值
copy要领:拷贝累加器
spark中累加器的施行流程:
首先有几个task,spark engine就调用copy要领拷贝几个累加器(不注册的),然后在各个task中进行累加(注意在此历程中,被最初注册的累加器的值是不变的),施行最后将调用merge要领和各个task的效果累计器进行合并(此时被注册的累加器是初始值)
总结
以上就是本文对于Spark自定义累加器的运用实例详解的全部内容,但愿对大家有所帮忙。有什么题目可以随时留言,小编会及时回复大家的。
热门标签:dede模板 / destoon模板 / dedecms模版 / 织梦模板