国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語(yǔ)言|JavaScript|易語(yǔ)言|vb.net|

服務(wù)器之家 - 編程語(yǔ)言 - 編程技術(shù) - spark通過(guò)kafka-appender指定日志輸出到kafka引發(fā)的死鎖問(wèn)題

spark通過(guò)kafka-appender指定日志輸出到kafka引發(fā)的死鎖問(wèn)題

2020-09-12 17:22民工哥 編程技術(shù)

這篇文章主要介紹了spark通過(guò)kafka-appender指定日志輸出到kafka引發(fā)的死鎖,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

在采用log4jkafka-appender收集spark任務(wù)運(yùn)行日志時(shí),發(fā)現(xiàn)提交到yarn上的任務(wù)始終ACCEPTED狀態(tài),無(wú)法進(jìn)入RUNNING狀態(tài),并且會(huì)重試兩次后超時(shí)。期初認(rèn)為是yarn資源不足導(dǎo)致,但在確認(rèn)yarn資源充裕的時(shí)候問(wèn)題依舊,而且基本上能穩(wěn)定復(fù)現(xiàn)。

起初是這么配置spark日志輸出到kafka的:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
log4j.rootCategory=INFO, console, kafka
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m%n
 
# Kafka appender
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
# Set Kafka topic and brokerList
log4j.appender.kafka.topic=yarn_spark_log
log4j.appender.kafka.brokerList=localhost:9092
log4j.appender.kafka.compressionType=none
log4j.appender.kafka.syncSend=false
log4j.appender.kafka.maxBlockMs=10
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p %c{1}: [${log4j.pipelineId}] %m

這里用org.apache.kafka.log4jappender.KafkaLog4jAppender默認(rèn)將所有日志都輸出到kafka,這個(gè)appender已經(jīng)被kafka官方維護(hù),穩(wěn)定性應(yīng)該是可以保障的。

問(wèn)題定位

發(fā)現(xiàn)問(wèn)題后,嘗試將輸出到kafka的規(guī)則去掉,問(wèn)題解除!于是把問(wèn)題定位到跟日志輸出到kafka有關(guān)。通過(guò)其他測(cè)試,證實(shí)目標(biāo)kafka其實(shí)是正常的,這就非常奇怪了。

查看yarn的ResourceManager日志,發(fā)現(xiàn)有如下超時(shí)

2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.util.AbstractLivelinessMonitor: Expired:appattempt_1578970174552_3204_000002 Timed out after 600 secs
2020-05-07 21:49:48,230 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: Updating application attempt appattempt_1578970174552_3204_000002 with final
 state: FAILED, and exit status: -1000
2020-05-07 21:49:48,231 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1578970174552_3204_000002 State change from LAUNCHED to FINAL_SAV
ING on event = EXPIRE

表明,yarn本身是接收任務(wù)的,但是發(fā)現(xiàn)任務(wù)遲遲沒(méi)有啟動(dòng)。在spark的場(chǎng)景下其實(shí)是指只有driver啟動(dòng)了,但是沒(méi)有啟動(dòng)executor。
而查看driver日志,發(fā)現(xiàn)日志輸出到一個(gè)地方就卡住了,不往下繼續(xù)了。通過(guò)對(duì)比成功運(yùn)行和卡住的情況發(fā)現(xiàn),日志卡在這條上:

2020/05/07 19:37:10.324 INFO SecurityManager: Changing view acls to: yarn,root
2020/05/07 19:37:10.344 INFO Metadata: Cluster ID: 6iG6WHA2SoK7FfgGgWHt_A

卡住的情況下,只會(huì)打出SecurityManager這行,而無(wú)法打出Metadata這行。
猜想Metadata這行是kafka-client本身打出來(lái)的,因?yàn)檎麄€(gè)上下文只有yarn, spark, kafka-client可能會(huì)打出這個(gè)日志。

在kafka-client 2.2.0版本中找到這個(gè)日志是輸出位置:

?
1
2
3
4
5
6
7
8
9
public synchronized void update(MetadataResponse metadataResponse, long now) {
  ...
 
  String newClusterId = cache.cluster().clusterResource().clusterId();
  if (!Objects.equals(previousClusterId, newClusterId)) {
    log.info("Cluster ID: {}", newClusterId);
  }
  ...
}

看到synchronized,高度懷疑死鎖。于是考慮用jstack分析:

在yarn上運(yùn)行spark任務(wù)的時(shí)候,driver進(jìn)程叫ApplicationMaster,executor進(jìn)程叫CoarseGrainedExecutorBackend。這里首先嘗試再?gòu)?fù)現(xiàn)過(guò)程中找到drvier最終在哪個(gè)節(jié)點(diǎn)上運(yùn)行,然后快速使用jstack -F <pid>打印堆棧

jstack果然不負(fù)眾望,報(bào)告了死鎖!這里我把結(jié)果貼的全一點(diǎn)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
[root@node1 ~]# jstack 20136
20136: Unable to open socket file: target process not responding or HotSpot VM not loaded
The -F option can be used when the target process is not responding
[root@node1 ~]# jstack -F 20136
Attaching to process ID 20136, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.231-b11
Deadlock Detection:
 
Found one Java-level deadlock:
=============================
 
"kafka-producer-network-thread | producer-1":
 waiting to lock Monitor@0x00000000025fcc48 (Object@0x00000000ed680b60, a org/apache/kafka/log4jappender/KafkaLog4jAppender),
 which is held by "main"
"main":
 waiting to lock Monitor@0x00007fec9dbde038 (Object@0x00000000ee44de38, a org/apache/kafka/clients/Metadata),
 which is held by "kafka-producer-network-thread | producer-1"
 
Found a total of 1 deadlock.
 
Thread 20157: (state = BLOCKED)
 - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=0, line=231 (Interpreted frame)
 - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame)
 - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String, java.lang.Object) @bci=34, line=324 (Interpreted frame)
 - org.apache.kafka.clients.Metadata.update(org.apache.kafka.common.requests.MetadataResponse, long) @bci=317, line=365 (Interpreted frame)
 - org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(org.apache.kafka.common.requests.RequestHeader, long, org.apache.kafka.common.requests.MetadataResponse) @bci=184, line=1031 (Interpreted frame)
 - org.apache.kafka.clients.NetworkClient.handleCompletedReceives(java.util.List, long) @bci=215, line=822 (Interpreted frame)
 - org.apache.kafka.clients.NetworkClient.poll(long, long) @bci=132, line=544 (Interpreted frame)
 - org.apache.kafka.clients.producer.internals.Sender.run(long) @bci=227, line=311 (Interpreted frame)
 - org.apache.kafka.clients.producer.internals.Sender.run() @bci=28, line=235 (Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)
 
 
Thread 20150: (state = BLOCKED)
 
 
Thread 20149: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.ref.ReferenceQueue.remove(long) @bci=59, line=144 (Interpreted frame)
 - java.lang.ref.ReferenceQueue.remove() @bci=2, line=165 (Interpreted frame)
 - java.lang.ref.Finalizer$FinalizerThread.run() @bci=36, line=216 (Interpreted frame)
 
 
Thread 20148: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=502 (Interpreted frame)
 - java.lang.ref.Reference.tryHandlePending(boolean) @bci=54, line=191 (Interpreted frame)
 - java.lang.ref.Reference$ReferenceHandler.run() @bci=1, line=153 (Interpreted frame)
 
 
Thread 20137: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - org.apache.kafka.clients.Metadata.awaitUpdate(int, long) @bci=63, line=261 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(java.lang.String, java.lang.Integer, long) @bci=160, line=983 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.doSend(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=19, line=860 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) @bci=12, line=840 (Interpreted frame)
 - org.apache.kafka.clients.producer.KafkaProducer.send(org.apache.kafka.clients.producer.ProducerRecord) @bci=3, line=727 (Interpreted frame)
 - org.apache.kafka.log4jappender.KafkaLog4jAppender.append(org.apache.log4j.spi.LoggingEvent) @bci=69, line=283 (Interpreted frame)
 - org.apache.log4j.AppenderSkeleton.doAppend(org.apache.log4j.spi.LoggingEvent) @bci=106, line=251 (Interpreted frame)
 - org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(org.apache.log4j.spi.LoggingEvent) @bci=41, line=66 (Interpreted frame)
 - org.apache.log4j.Category.callAppenders(org.apache.log4j.spi.LoggingEvent) @bci=26, line=206 (Interpreted frame)
 - org.apache.log4j.Category.forcedLog(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=14, line=391 (Interpreted frame)
 - org.apache.log4j.Category.log(java.lang.String, org.apache.log4j.Priority, java.lang.Object, java.lang.Throwable) @bci=34, line=856 (Interpreted frame)
 - org.slf4j.impl.Log4jLoggerAdapter.info(java.lang.String) @bci=12, line=305 (Interpreted frame)
 - org.apache.spark.internal.Logging$class.logInfo(org.apache.spark.internal.Logging, scala.Function0) @bci=29, line=54 (Interpreted frame)
 - org.apache.spark.SecurityManager.logInfo(scala.Function0) @bci=2, line=44 (Interpreted frame)
 - org.apache.spark.SecurityManager.setViewAcls(scala.collection.immutable.Set, java.lang.String) @bci=36, line=139 (Interpreted frame)
 - org.apache.spark.SecurityManager.<init>(org.apache.spark.SparkConf, scala.Option) @bci=158, line=81 (Interpreted frame)
 - org.apache.spark.deploy.yarn.ApplicationMaster.<init>(org.apache.spark.deploy.yarn.ApplicationMasterArguments) @bci=85, line=70 (Interpreted frame)
 - org.apache.spark.deploy.yarn.ApplicationMaster$.main(java.lang.String[]) @bci=25, line=802 (Interpreted frame)
 - org.apache.spark.deploy.yarn.ApplicationMaster.main(java.lang.String[]) @bci=4 (Interpreted frame)

到這里,已經(jīng)確定是死鎖,導(dǎo)致driver一開(kāi)始就運(yùn)行停滯,那么當(dāng)然無(wú)法提交executor執(zhí)行。
具體的死鎖稍后分析,先考慮如何解決。從感性認(rèn)識(shí)看,似乎只要不讓kafka-client的日志也輸出到kafka即可。實(shí)驗(yàn)后,發(fā)現(xiàn)果然如此:如果只輸出org.apache.spark的日志就可以正常執(zhí)行。

根因分析

從stack的結(jié)果看,造成死鎖的是如下兩個(gè)線程:

  • kafka-client內(nèi)部的網(wǎng)絡(luò)線程spark
  • 主入口線程

兩個(gè)線程其實(shí)都是卡在打日志上了,觀察堆棧可以發(fā)現(xiàn),兩個(gè)線程同時(shí)持有了同一個(gè)log對(duì)象。而這個(gè)log對(duì)象實(shí)際上是kafka-appender。而kafka-appender本質(zhì)上持有kafka-client,及其內(nèi)部的Metadata對(duì)象。log4j的doAppend為了保證線程安全也用synchronized修飾了:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public
 synchronized
 void doAppend(LoggingEvent event) {
  if(closed) {
   LogLog.error("Attempted to append to closed appender named ["+name+"].");
   return;
  }
  
  if(!isAsSevereAsThreshold(event.level)) {
   return;
  }
 
  Filter f = this.headFilter;
  
  FILTER_LOOP:
  while(f != null) {
   switch(f.decide(event)) {
   case Filter.DENY: return;
   case Filter.ACCEPT: break FILTER_LOOP;
   case Filter.NEUTRAL: f = f.next;
   }
  }
  
  this.append(event); 
 }

于是事情開(kāi)始了:

  • main線程嘗試打日志,首先進(jìn)入了synchronized的doAppend,即獲取了kafka-appender的鎖
  • kafka-appender內(nèi)部需要調(diào)用kafka-client發(fā)送日志到kafka,最終調(diào)用到Thread 20137展示的,運(yùn)行到Metadata.awaitUpdate(也是個(gè)synchronized方法),內(nèi)部的wait會(huì)嘗試獲取metadata的鎖。(詳見(jiàn)https://github.com/apache/kaf...)
  • 但此時(shí),kafka-producer-network-thread線程剛好進(jìn)入了上文提到的打Cluster ID這個(gè)日志的這個(gè)階段(update方法也是synchronized的),也就是說(shuō)kafka-producer-network-thread線程獲得了metadata對(duì)象的鎖
  • kafka-producer-network-thread線程要打印日志同樣執(zhí)行synchronized的doAppend,即獲取了kafka-appender的鎖

spark通過(guò)kafka-appender指定日志輸出到kafka引發(fā)的死鎖問(wèn)題

上圖main-thread持有了log對(duì)象鎖,要求獲取metadata對(duì)象鎖;kafka-producer-network-thread持有了metadata對(duì)象鎖,要求獲取log對(duì)象鎖于是造成了死鎖。

總結(jié)

到此這篇關(guān)于spark通過(guò)kafka-appender指定日志輸出到kafka引發(fā)的死鎖的文章就介紹到這了,更多相關(guān)spark指定日志輸出內(nèi)容請(qǐng)搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!

原文鏈接:https://segmentfault.com/a/1190000022577776

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 成人免费视频网站在线观看 | 亚洲国内精品 | 亚洲视频二区 | 国产人妖在线 | 亚洲第一成年人网站 | 日韩av一区二区在线观看 | 牛牛电影国产一区二区 | 国产免费黄色 | 国产一区二区综合 | 午夜网| 国产乱码一区二区三区在线观看 | 在线观看中文字幕 | 亚洲成人第一网站 | 日韩欧美一级电影 | 色爱区成人综合网 | 色综合久久久久久久久久久 | 狠狠操狠狠操 | 韩国一区二区视频 | 亚洲精品久久久久久动漫 | 亚洲天天干 | 亚洲 欧美 日韩在线 | 久久久国产视频 | 毛片aaa| 亚洲精品一区二区三区四区高清 | 国产高清在线看 | 99精品国自产在线 | av网站免费看 | 婷婷精品久久久久久久久久不卡 | 国产在线观看二区 | 免费裸体无遮挡黄网站免费看 | 久久精国产 | 国产高清在线精品一区二区三区 | 亚洲欧美综合乱码精品成人网 | 免费看国产片在线观看 | 日本黄色片免费看 | 黄色大片网站 | 亚洲天堂中文字幕 | 日韩欧美视频一区 | 99久久久无码国产精品 | av瑟瑟| 狠狠的日|