国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

云服務(wù)器|WEB服務(wù)器|FTP服務(wù)器|郵件服務(wù)器|虛擬主機(jī)|服務(wù)器安全|DNS服務(wù)器|服務(wù)器知識|Nginx|IIS|Tomcat|

服務(wù)器之家 - 服務(wù)器技術(shù) - 服務(wù)器知識 - Spark自定義累加器的使用實例詳解

Spark自定義累加器的使用實例詳解

2020-08-12 21:54willian_zhang 服務(wù)器知識

這篇文章主要介紹了Spark累加器的相關(guān)內(nèi)容,首先介紹了累加器的簡單使用,然后向大家分享了自定義累加器的實例代碼,需要的朋友可以參考下。

累加器(accumulator)是Spark中提供的一種分布式的變量機(jī)制,其原理類似于mapreduce,即分布式的改變,然后聚合這些改變。累加器的一個常見用途是在調(diào)試時對作業(yè)執(zhí)行過程中的事件進(jìn)行計數(shù)。

累加器簡單使用

Spark內(nèi)置的提供了Long和Double類型的累加器。下面是一個簡單的使用示例,在這個例子中我們在過濾掉RDD中奇數(shù)的同時進(jìn)行計數(shù),最后計算剩下整數(shù)的和。

?
1
2
3
4
5
6
7
8
9
10
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val accum = sc.longAccumulator("longAccum") //統(tǒng)計奇數(shù)的個數(shù)
val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{
 if(n%2!=0) accum.add(1L) 
 n%2==0
}).reduce(_+_)
println("sum: "+sum)
println("accum: "+accum.value)
sc.stop()

結(jié)果為:

sum: 20
accum: 5

這是結(jié)果正常的情況,但是在使用累加器的過程中如果對于spark的執(zhí)行過程理解的不夠深入就會遇到兩類典型的錯誤:少加(或者沒加)、多加。

自定義累加器

自定義累加器類型的功能在1.X版本中就已經(jīng)提供了,但是使用起來比較麻煩,在2.0版本后,累加器的易用性有了較大的改進(jìn),而且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現(xiàn)方式。官方同時給出了一個實現(xiàn)的示例:CollectionAccumulator類,這個類允許以集合的形式收集spark應(yīng)用執(zhí)行過程中的一些信息。例如,我們可以用這個類收集Spark處理數(shù)據(jù)時的一些細(xì)節(jié),當(dāng)然,由于累加器的值最終要匯聚到driver端,為了避免 driver端的outofmemory問題,需要對收集的信息的規(guī)模要加以控制,不宜過大。

繼承AccumulatorV2類,并復(fù)寫它的所有方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package spark
import constant.Constant
import org.apache.spark.util.AccumulatorV2
import util.getFieldFromConcatString
import util.setFieldFromConcatString
open class SessionAccmulator : AccumulatorV2<String, String>() {
  private var result = Constant.SESSION_COUNT + "=0|"+
      Constant.TIME_PERIOD_1s_3s + "=0|"+
      Constant.TIME_PERIOD_4s_6s + "=0|"+
      Constant.TIME_PERIOD_7s_9s + "=0|"+
      Constant.TIME_PERIOD_10s_30s + "=0|"+
      Constant.TIME_PERIOD_30s_60s + "=0|"+
      Constant.TIME_PERIOD_1m_3m + "=0|"+
      Constant.TIME_PERIOD_3m_10m + "=0|"+
      Constant.TIME_PERIOD_10m_30m + "=0|"+
      Constant.TIME_PERIOD_30m + "=0|"+
      Constant.STEP_PERIOD_1_3 + "=0|"+
      Constant.STEP_PERIOD_4_6 + "=0|"+
      Constant.STEP_PERIOD_7_9 + "=0|"+
      Constant.STEP_PERIOD_10_30 + "=0|"+
      Constant.STEP_PERIOD_30_60 + "=0|"+
      Constant.STEP_PERIOD_60 + "=0"
  override fun value(): String {
    return this.result
  }
  /**
   * 合并數(shù)據(jù)
   */
  override fun merge(other: AccumulatorV2<String, String>?) {
    if (other == null) return else {
      if (other is SessionAccmulator) {
        var newResult = ""
        val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s,
            Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m,
            Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m,
            Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9,
            Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60)
        resultArray.forEach {
          val oldValue = other.result.getFieldFromConcatString("|", it)
          if (oldValue.isNotEmpty()) {
            val newValue = oldValue.toInt() + 1
            //找到原因,一直在循環(huán)賦予值,debug30分鐘 很煩
            if (newResult.isEmpty()){
              newResult = result.setFieldFromConcatString("|", it, newValue.toString())
            }
            //問題就在于這里,自定義沒有寫錯,合并錯了
            newResult = newResult.setFieldFromConcatString("|", it, newValue.toString())
          }
        }
        result = newResult
      }
    }
  }
  override fun copy(): AccumulatorV2<String, String> {
    val sessionAccmulator = SessionAccmulator()
    sessionAccmulator.result = this.result
    return sessionAccmulator
  }
  override fun add(p0: String?) {
    val v1 = this.result
    val v2 = p0
    if (v2.isNullOrEmpty()){
      return
    }else{
      var newResult = ""
      val oldValue = v1.getFieldFromConcatString("|", v2!!)
      if (oldValue.isNotEmpty()){
        val newValue = oldValue.toInt() + 1
        newResult = result.setFieldFromConcatString("|", v2, newValue.toString())
      }
      result = newResult
    }
  }
  override fun reset() {
    val newResult = Constant.SESSION_COUNT + "=0|"+
        Constant.TIME_PERIOD_1s_3s + "=0|"+
        Constant.TIME_PERIOD_4s_6s + "=0|"+
        Constant.TIME_PERIOD_7s_9s + "=0|"+
        Constant.TIME_PERIOD_10s_30s + "=0|"+
        Constant.TIME_PERIOD_30s_60s + "=0|"+
        Constant.TIME_PERIOD_1m_3m + "=0|"+
        Constant.TIME_PERIOD_3m_10m + "=0|"+
        Constant.TIME_PERIOD_10m_30m + "=0|"+
        Constant.TIME_PERIOD_30m + "=0|"+
        Constant.STEP_PERIOD_1_3 + "=0|"+
        Constant.STEP_PERIOD_4_6 + "=0|"+
        Constant.STEP_PERIOD_7_9 + "=0|"+
        Constant.STEP_PERIOD_10_30 + "=0|"+
        Constant.STEP_PERIOD_30_60 + "=0|"+
        Constant.STEP_PERIOD_60 + "=0"
    result = newResult
  }
  override fun isZero(): Boolean {
    val newResult = Constant.SESSION_COUNT + "=0|"+
        Constant.TIME_PERIOD_1s_3s + "=0|"+
        Constant.TIME_PERIOD_4s_6s + "=0|"+
        Constant.TIME_PERIOD_7s_9s + "=0|"+
        Constant.TIME_PERIOD_10s_30s + "=0|"+
        Constant.TIME_PERIOD_30s_60s + "=0|"+
        Constant.TIME_PERIOD_1m_3m + "=0|"+
        Constant.TIME_PERIOD_3m_10m + "=0|"+
        Constant.TIME_PERIOD_10m_30m + "=0|"+
        Constant.TIME_PERIOD_30m + "=0|"+
        Constant.STEP_PERIOD_1_3 + "=0|"+
        Constant.STEP_PERIOD_4_6 + "=0|"+
        Constant.STEP_PERIOD_7_9 + "=0|"+
        Constant.STEP_PERIOD_10_30 + "=0|"+
        Constant.STEP_PERIOD_30_60 + "=0|"+
        Constant.STEP_PERIOD_60 + "=0"
    return this.result == newResult
  }
}

方法介紹

value方法:獲取累加器中的值

       merge方法:該方法特別重要,一定要寫對,這個方法是各個task的累加器進(jìn)行合并的方法(下面介紹執(zhí)行流程中將要用到)

        iszero方法:判斷是否為初始值

        reset方法:重置累加器中的值

        copy方法:拷貝累加器

spark中累加器的執(zhí)行流程:

          首先有幾個task,spark engine就調(diào)用copy方法拷貝幾個累加器(不注冊的),然后在各個task中進(jìn)行累加(注意在此過程中,被最初注冊的累加器的值是不變的),執(zhí)行最后將調(diào)用merge方法和各個task的結(jié)果累計器進(jìn)行合并(此時被注冊的累加器是初始值)

總結(jié)

以上就是本文關(guān)于Spark自定義累加器的使用實例詳解的全部內(nèi)容,希望對大家有所幫助。有什么問題可以隨時留言,小編會及時回復(fù)大家的。

原文鏈接:http://www.cnblogs.com/zhangweilun/p/6684776.html

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 午夜精品久久久久久久久久久久久 | www.欧美日韩 | 国产一区二区久久久 | av一级久久 | 精品国产乱码久久久久久1区2区 | 懂色一区二区三区av片 | 日韩在线小视频 | 国产成人精品一区二 | 午夜视频在线网站 | 成人欧美一区二区三区在线观看 | 一级α片免费看 | 欧美福利 | 亚洲激情视频在线播放 | 日韩精品一二三区 | 最好看的2019年中文在线观看 | 欧美在线网站 | 婷婷免费视频 | 精品在线一区二区 | 久久成人一区 | 日本成人网址 | 91国内精品久久 | 国产成人精品免费 | 北条麻妃99精品青青久久 | 日韩精品一区二区三区在线播放 | 天天拍天天草 | 午夜精品久久久久久久久久久久 | 青青国产视频 | 国产欧美一二三区在线粉嫩 | 精品无码久久久久久久动漫 | 日韩精品小视频 | 亚洲男人第一天堂 | 国产美女自拍视频 | 精品国产一区二区三区久久久蜜月 | 91视频一区二区 | 中文字幕在线观看一区二区三区 | 欧美视频在线一区 | 久久久久久久国产精品 | 操av在线| 色综合天天综合网国产成人网 | www.一区| 欧美日韩精品免费观看 |