国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - MapTask階段shuffle源碼分析

MapTask階段shuffle源碼分析

2021-07-01 14:0743193797 Java教程

今天小編就為大家分享一篇關于MapTask階段shuffle源碼分析,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧

1. 收集階段

mapper中,調用context.write(key,value)實際是調用代理newoutputcollectorwirte方法

?
1
2
3
4
public void write(keyout key, valueout value
          ) throws ioexception, interruptedexception {
  output.write(key, value);
 }

實際調用的是mapoutputbuffercollect(),在進行收集前,調用partitioner來計算每個key-value的分區號

?
1
2
3
4
5
@override
  public void write(k key, v value) throws ioexception, interruptedexception {
   collector.collect(key, value,
            partitioner.getpartition(key, value, partitions));
  }

2. newoutputcollector對象的創建

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@suppresswarnings("unchecked")
  newoutputcollector(org.apache.hadoop.mapreduce.jobcontext jobcontext,
            jobconf job,
            taskumbilicalprotocol umbilical,
            taskreporter reporter
            ) throws ioexception, classnotfoundexception {
  // 創建實際用來收集key-value的緩存區對象
   collector = createsortingcollector(job, reporter);
  // 獲取總的分區個數
   partitions = jobcontext.getnumreducetasks();
   if (partitions > 1) {
    partitioner = (org.apache.hadoop.mapreduce.partitioner<k,v>)
     reflectionutils.newinstance(jobcontext.getpartitionerclass(), job);
   } else {
    // 默認情況,直接創建一個匿名內部類,所有的key-value都分配到0號分區
    partitioner = new org.apache.hadoop.mapreduce.partitioner<k,v>() {
     @override
     public int getpartition(k key, v value, int numpartitions) {
      return partitions - 1;
     }
    };
   }
  }

3. 創建環形緩沖區對象

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@suppresswarnings("unchecked")
 private <key, value> mapoutputcollector<key, value>
     createsortingcollector(jobconf job, taskreporter reporter)
  throws ioexception, classnotfoundexception {
  mapoutputcollector.context context =
   new mapoutputcollector.context(this, job, reporter);
  // 從當前job的配置中,獲取mapreduce.job.map.output.collector.class,如果沒有設置,使用mapoutputbuffer.class
  class<?>[] collectorclasses = job.getclasses(
   jobcontext.map_output_collector_class_attr, mapoutputbuffer.class);
  int remainingcollectors = collectorclasses.length;
  exception lastexception = null;
  for (class clazz : collectorclasses) {
   try {
    if (!mapoutputcollector.class.isassignablefrom(clazz)) {
     throw new ioexception("invalid output collector class: " + clazz.getname() +
      " (does not implement mapoutputcollector)");
    }
    class<? extends mapoutputcollector> subclazz =
     clazz.assubclass(mapoutputcollector.class);
    log.debug("trying map output collector class: " + subclazz.getname());
   // 創建緩沖區對象
    mapoutputcollector<key, value> collector =
     reflectionutils.newinstance(subclazz, job);
   // 創建完緩沖區對象后,執行初始化
    collector.init(context);
    log.info("map output collector class = " + collector.getclass().getname());
    return collector;
   } catch (exception e) {
    string msg = "unable to initialize mapoutputcollector " + clazz.getname();
    if (--remainingcollectors > 0) {
     msg += " (" + remainingcollectors + " more collector(s) to try)";
    }
    lastexception = e;
    log.warn(msg, e);
   }
  }
  throw new ioexception("initialization of all the collectors failed. " +
   "error in last collector was :" + lastexception.getmessage(), lastexception);
 }

3. mapoutputbuffer的初始化   環形緩沖區對象

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@suppresswarnings("unchecked")
  public void init(mapoutputcollector.context context
          ) throws ioexception, classnotfoundexception {
   job = context.getjobconf();
   reporter = context.getreporter();
   maptask = context.getmaptask();
   mapoutputfile = maptask.getmapoutputfile();
   sortphase = maptask.getsortphase();
   spilledrecordscounter = reporter.getcounter(taskcounter.spilled_records);
   // 獲取分區總個數,取決于reducetask的數量
   partitions = job.getnumreducetasks();
   rfs = ((localfilesystem)filesystem.getlocal(job)).getraw();
   //sanity checks
   // 從當前配置中,獲取mapreduce.map.sort.spill.percent,如果沒有設置,就是0.8
   final float spillper =
    job.getfloat(jobcontext.map_sort_spill_percent, (float)0.8);
   // 獲取mapreduce.task.io.sort.mb,如果沒設置,就是100mb
   final int sortmb = job.getint(jobcontext.io_sort_mb, 100);
   indexcachememorylimit = job.getint(jobcontext.index_cache_memory_limit,
                     index_cache_memory_limit_default);
   if (spillper > (float)1.0 || spillper <= (float)0.0) {
    throw new ioexception("invalid \"" + jobcontext.map_sort_spill_percent +
      "\": " + spillper);
   }
   if ((sortmb & 0x7ff) != sortmb) {
    throw new ioexception(
      "invalid \"" + jobcontext.io_sort_mb + "\": " + sortmb);
   }
// 在溢寫前,對key-value排序,采用的排序器,使用快速排序,只排索引
   sorter = reflectionutils.newinstance(job.getclass("map.sort.class",
      quicksort.class, indexedsorter.class), job);
   // buffers and accounting
   int maxmemusage = sortmb << 20;
   maxmemusage -= maxmemusage % metasize;
   // 存放key-value
   kvbuffer = new byte[maxmemusage];
   bufvoid = kvbuffer.length;
  // 存儲key-value的屬性信息,分區號,索引等
   kvmeta = bytebuffer.wrap(kvbuffer)
     .order(byteorder.nativeorder())
     .asintbuffer();
   setequator(0);
   bufstart = bufend = bufindex = equator;
   kvstart = kvend = kvindex;
   maxrec = kvmeta.capacity() / nmeta;
   softlimit = (int)(kvbuffer.length * spillper);
   bufferremaining = softlimit;
   log.info(jobcontext.io_sort_mb + ": " + sortmb);
   log.info("soft limit at " + softlimit);
   log.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
   log.info("kvstart = " + kvstart + "; length = " + maxrec);
   // k/v serialization
    // 獲取快速排序的key的比較器,排序只按照key進行排序!
   comparator = job.getoutputkeycomparator();
  // 獲取key-value的序列化器
   keyclass = (class<k>)job.getmapoutputkeyclass();
   valclass = (class<v>)job.getmapoutputvalueclass();
   serializationfactory = new serializationfactory(job);
   keyserializer = serializationfactory.getserializer(keyclass);
   keyserializer.open(bb);
   valserializer = serializationfactory.getserializer(valclass);
   valserializer.open(bb);
   // output counters
   mapoutputbytecounter = reporter.getcounter(taskcounter.map_output_bytes);
   mapoutputrecordcounter =
    reporter.getcounter(taskcounter.map_output_records);
   fileoutputbytecounter = reporter
     .getcounter(taskcounter.map_output_materialized_bytes);
   // 溢寫到磁盤,可以使用一個壓縮格式! 獲取指定的壓縮編解碼器
   // compression
   if (job.getcompressmapoutput()) {
    class<? extends compressioncodec> codecclass =
     job.getmapoutputcompressorclass(defaultcodec.class);
    codec = reflectionutils.newinstance(codecclass, job);
   } else {
    codec = null;
   }
   // 獲取combiner組件
   // combiner
   final counters.counter combineinputcounter =
    reporter.getcounter(taskcounter.combine_input_records);
   combinerrunner = combinerrunner.create(job, gettaskid(),
                       combineinputcounter,
                       reporter, null);
   if (combinerrunner != null) {
    final counters.counter combineoutputcounter =
     reporter.getcounter(taskcounter.combine_output_records);
    combinecollector= new combineoutputcollector<k,v>(combineoutputcounter, reporter, job);
   } else {
    combinecollector = null;
   }
   spillinprogress = false;
   minspillsforcombine = job.getint(jobcontext.map_combine_min_spills, 3);
   // 設置溢寫線程在后臺運行,溢寫是在后臺運行另外一個溢寫線程!和收集是兩個線程!
   spillthread.setdaemon(true);
   spillthread.setname("spillthread");
   spilllock.lock();
   try {
   // 啟動線程
    spillthread.start();
    while (!spillthreadrunning) {
     spilldone.await();
    }
   } catch (interruptedexception e) {
    throw new ioexception("spill thread failed to initialize", e);
   } finally {
    spilllock.unlock();
   }
   if (sortspillexception != null) {
    throw new ioexception("spill thread failed to initialize",
      sortspillexception);
   }
  }

4. paritionner的獲取

從配置中讀取mapreduce.job.partitioner.class,如果沒有指定,采用hashpartitioner.class

如果reducetask > 1, 還沒有設置分區組件,使用hashpartitioner

?
1
2
3
4
5
6
@suppresswarnings("unchecked")
 public class<? extends partitioner<?,?>> getpartitionerclass()
   throws classnotfoundexception {
  return (class<? extends partitioner<?,?>>)
   conf.getclass(partitioner_class_attr, hashpartitioner.class);
 }
?
1
2
3
4
5
6
7
public class hashpartitioner<k, v> extends partitioner<k, v> {
 /** use {@link object#hashcode()} to partition. **/
 public int getpartition(k key, v value,
             int numreducetasks) {
  return (key.hashcode() & integer.max_value) % numreducetasks;
 }
}

分區號的限制:0 <= 分區號 < 總的分區數(reducetask的個數)

?
1
2
3
4
if (partition < 0 || partition >= partitions) {
    throw new ioexception("illegal partition for " + key + " (" +
      partition + ")");
   }

5.maptask shuffle的流程

              ①在map()調用context.write()

              ②調用mapoutputbuffer的collect()

  •                             調用分區組件partitionner計算當前這組key-value的分區號

              ③將當前key-value收集到mapoutputbuffer中

  •                             如果超過溢寫的閥值,在后臺啟動溢寫線程,來進行溢寫!

              ④溢寫前,先根據分區號,將相同分區號的key-value,采用快速排序算法,進行排序!

  •                             排序并不在內存中移動key-value,而是記錄排序后key-value的有序索引!

              ⑤ 開始溢寫,按照排序后有序的索引,將文件寫入到一個臨時的溢寫文件中

  •                             如果沒有定義combiner,直接溢寫!
  •                             如果定義了combiner,使用combinerrunner.conbine()對key-value處理后再次溢寫!

              ⑥多次溢寫后,每次溢寫都會產生一個臨時文件

              ⑦最后,執行一次flush(),將剩余的key-value進行溢寫

              ⑧mergeparts: 將多次溢寫的結果,保存為一個總的文件!

  •                      在合并為一個總的文件前,會執行歸并排序,保證合并后的文件,各個分區也是有序的!
  •                      如果定義了conbiner,conbiner會再次運行(前提是溢寫的文件個數大于3)!
  •                      否則,就直接溢寫!

              ⑨最終保證生成一個最終的文件,這個文件根據總區號,分為若干部分,每個部分的key-value都已經排好序,等待reducetask來拷貝相應分區的數據

6. combiner

combiner其實就是reducer類型:

?
1
2
class<? extends reducer<k,v,k,v>> cls =
    (class<? extends reducer<k,v,k,v>>) job.getcombinerclass();

combiner的運行時機:

maptask:

  •               ①每次溢寫前,如果指定了combiner,會運行
  •               ②將多個溢寫片段,進行合并為一個最終的文件時,也會運行combiner,前提是片段數>=3

reducetask:

              ③reducetask在運行時,需要啟動shuffle進程拷貝maptask產生的數據!

  •                      數據在copy后,進入shuffle工作的內存,在內存中進行merge和sort!
  •                      數據過多,內部不夠,將部分數據溢寫在磁盤!
  •                      如果有溢寫的過程,那么combiner會再次運行!

①一定會運行,②,③需要條件!

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對服務器之家的支持。如果你想了解更多相關內容請查看下面相關鏈接

原文鏈接:https://blog.csdn.net/qq_43193797/article/details/86097451

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 一级黄色国产视频 | 午夜影院在线 | 日韩一区二区在线视频 | 日韩中文字幕一区二区三区 | 亚洲a网 | 久久免费福利视频 | 日韩欧美国产一区二区 | 一级片在线观看 | 国产在线观看高清 | 伊人色私人影院蜜桃va | 日本不卡一区二区三区在线观看 | 成人综合在线观看 | 亚洲高清在线视频 | 国产综合久久 | 国产精品亚洲自拍 | 日本欧美在线 | 黄在线免费观看 | aaa在线免费观看 | 欧美日韩一区二区三区不卡视频 | av黄色在线播放 | 91视频 - 88av| 成人日韩 | 在线免费观看h片 | 欧美一区二区三区免费 | 一级黄色大片免费 | 亚洲八区 | 日韩中文字幕在线观看视频 | 欧美精品不卡 | jizz国产| 国产成人在线播放 | 免费看少妇高潮一级毛片特黄 | 日韩av怡红院| 国产精品欧美一区二区三区 | 精品伦精品一区二区三区视频 | 亚洲欧美激情精品一区二区 | 欧美一级片免费播放 | 一区二区三区av | 成人羞羞网站 | 国产精品欧美一区二区三区不卡 | 久久久精品观看 | 欧美久久久久久久久久久 |