累加器(accumulator)是Spark中提供的一種分布式的變量機(jī)制,其原理類似于mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個常見用途是在調(diào)試時對作業(yè)執(zhí)行過程中的事件進(jìn)行計數(shù)。
累加器簡單使用
Spark內(nèi)置的提供了Long和Double類型的累加器。下面是一個簡單的使用示例,在這個例子中我們在過濾掉RDD中奇數(shù)的同時進(jìn)行計數(shù),最后計算剩下整數(shù)的和。
1
2
3
4
5
6
7
8
9
10
|
val sparkConf = new SparkConf().setAppName( "Test" ).setMaster( "local[2]" ) val sc = new SparkContext(sparkConf) val accum = sc.longAccumulator( "longAccum" ) //統(tǒng)計奇數(shù)的個數(shù) val sum = sc.parallelize(Array( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 ), 2 ).filter(n=>{ if (n% 2 != 0 ) accum.add(1L) n% 2 == 0 }).reduce(_+_) println( "sum: " +sum) println( "accum: " +accum.value) sc.stop() |
結(jié)果為:
sum: 20
accum: 5
這是結(jié)果正常的情況,但是在使用累加器的過程中如果對于spark的執(zhí)行過程理解的不夠深入就會遇到兩類典型的錯誤:少加(或者沒加)、多加。
自定義累加器
自定義累加器類型的功能在1.X版本中就已經(jīng)提供了,但是使用起來比較麻煩,在2.0版本后,累加器的易用性有了較大的改進(jìn),而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現(xiàn)方式。官方同時給出了一個實現(xiàn)的示例:CollectionAccumulator類,這個類允許以集合的形式收集spark應(yīng)用執(zhí)行過程中的一些信息。例如,我們可以用這個類收集Spark處理數(shù)據(jù)時的一些細(xì)節(jié),當(dāng)然,由于累加器的值最終要匯聚到driver端,為了避免 driver端的outofmemory問題,需要對收集的信息的規(guī)模要加以控制,不宜過大。
繼承AccumulatorV2類,并復(fù)寫它的所有方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
package spark import constant.Constant import org.apache.spark.util.AccumulatorV2 import util.getFieldFromConcatString import util.setFieldFromConcatString open class SessionAccmulator : AccumulatorV2<String, String>() { private var result = Constant.SESSION_COUNT + "=0|" + Constant.TIME_PERIOD_1s_3s + "=0|" + Constant.TIME_PERIOD_4s_6s + "=0|" + Constant.TIME_PERIOD_7s_9s + "=0|" + Constant.TIME_PERIOD_10s_30s + "=0|" + Constant.TIME_PERIOD_30s_60s + "=0|" + Constant.TIME_PERIOD_1m_3m + "=0|" + Constant.TIME_PERIOD_3m_10m + "=0|" + Constant.TIME_PERIOD_10m_30m + "=0|" + Constant.TIME_PERIOD_30m + "=0|" + Constant.STEP_PERIOD_1_3 + "=0|" + Constant.STEP_PERIOD_4_6 + "=0|" + Constant.STEP_PERIOD_7_9 + "=0|" + Constant.STEP_PERIOD_10_30 + "=0|" + Constant.STEP_PERIOD_30_60 + "=0|" + Constant.STEP_PERIOD_60 + "=0" override fun value(): String { return this .result } /** * 合并數(shù)據(jù) */ override fun merge(other: AccumulatorV2<String, String>?) { if (other == null ) return else { if (other is SessionAccmulator) { var newResult = "" val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s, Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m, Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m, Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9, Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60) resultArray.forEach { val oldValue = other.result.getFieldFromConcatString( "|" , it) if (oldValue.isNotEmpty()) { val newValue = oldValue.toInt() + 1 //找到原因,一直在循環(huán)賦予值,debug30分鐘 很煩 if (newResult.isEmpty()){ newResult = result.setFieldFromConcatString( "|" , it, newValue.toString()) } //問題就在于這里,自定義沒有寫錯,合并錯了 newResult = newResult.setFieldFromConcatString( "|" , it, newValue.toString()) } } result = newResult } } } override fun copy(): AccumulatorV2<String, String> { val sessionAccmulator = SessionAccmulator() sessionAccmulator.result = this .result return sessionAccmulator } override fun add(p0: String?) { val v1 = this .result val v2 = p0 if (v2.isNullOrEmpty()){ return } else { var newResult = "" val oldValue = v1.getFieldFromConcatString( "|" , v2!!) if (oldValue.isNotEmpty()){ val newValue = oldValue.toInt() + 1 newResult = result.setFieldFromConcatString( "|" , v2, newValue.toString()) } result = newResult } } override fun reset() { val newResult = Constant.SESSION_COUNT + "=0|" + Constant.TIME_PERIOD_1s_3s + "=0|" + Constant.TIME_PERIOD_4s_6s + "=0|" + Constant.TIME_PERIOD_7s_9s + "=0|" + Constant.TIME_PERIOD_10s_30s + "=0|" + Constant.TIME_PERIOD_30s_60s + "=0|" + Constant.TIME_PERIOD_1m_3m + "=0|" + Constant.TIME_PERIOD_3m_10m + "=0|" + Constant.TIME_PERIOD_10m_30m + "=0|" + Constant.TIME_PERIOD_30m + "=0|" + Constant.STEP_PERIOD_1_3 + "=0|" + Constant.STEP_PERIOD_4_6 + "=0|" + Constant.STEP_PERIOD_7_9 + "=0|" + Constant.STEP_PERIOD_10_30 + "=0|" + Constant.STEP_PERIOD_30_60 + "=0|" + Constant.STEP_PERIOD_60 + "=0" result = newResult } override fun isZero(): Boolean { val newResult = Constant.SESSION_COUNT + "=0|" + Constant.TIME_PERIOD_1s_3s + "=0|" + Constant.TIME_PERIOD_4s_6s + "=0|" + Constant.TIME_PERIOD_7s_9s + "=0|" + Constant.TIME_PERIOD_10s_30s + "=0|" + Constant.TIME_PERIOD_30s_60s + "=0|" + Constant.TIME_PERIOD_1m_3m + "=0|" + Constant.TIME_PERIOD_3m_10m + "=0|" + Constant.TIME_PERIOD_10m_30m + "=0|" + Constant.TIME_PERIOD_30m + "=0|" + Constant.STEP_PERIOD_1_3 + "=0|" + Constant.STEP_PERIOD_4_6 + "=0|" + Constant.STEP_PERIOD_7_9 + "=0|" + Constant.STEP_PERIOD_10_30 + "=0|" + Constant.STEP_PERIOD_30_60 + "=0|" + Constant.STEP_PERIOD_60 + "=0" return this .result == newResult } } |
方法介紹
value方法:獲取累加器中的值
merge方法:該方法特別重要,一定要寫對,這個方法是各個task的累加器進(jìn)行合并的方法(下面介紹執(zhí)行流程中將要用到)
iszero方法:判斷是否為初始值
reset方法:重置累加器中的值
copy方法:拷貝累加器
spark中累加器的執(zhí)行流程:
首先有幾個task,spark engine就調(diào)用copy方法拷貝幾個累加器(不注冊的),然后在各個task中進(jìn)行累加(注意在此過程中,被最初注冊的累加器的值是不變的),執(zhí)行最后將調(diào)用merge方法和各個task的結(jié)果累計器進(jìn)行合并(此時被注冊的累加器是初始值)
總結(jié)
以上就是本文關(guān)于Spark自定義累加器的使用實例詳解的全部內(nèi)容,希望對大家有所幫助。有什么問題可以隨時留言,小編會及時回復(fù)大家的。
原文鏈接:http://www.cnblogs.com/zhangweilun/p/6684776.html