国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|JavaScript|易語言|

服務器之家 - 編程語言 - Java教程 - Kafka源碼系列教程之刪除topic

Kafka源碼系列教程之刪除topic

2021-05-26 13:32浪尖 Java教程

這篇文章主要給大家介紹了關于Kafka源碼系列教程之刪除topic的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

前言

apache kafka發源于linkedin,于2011年成為apache的孵化項目,隨后于2012年成為apache的主要項目之一。kafka使用scala和java進行編寫。apache kafka是一個快速、可擴展的、高吞吐、可容錯的分布式發布訂閱消息系統。kafka具有高吞吐量、內置分區、支持數據副本和容錯的特性,適合在大規模消息處理場景中使用。

本文依然是以kafka0.8.2.2為例講解

一,如何刪除一個topic

刪除一個topic有兩個關鍵點:

1,配置刪除參數

delete.topic.enable這個broker參數配置為true。

2,執行

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

假如不配置刪除參數為true的話,topic其實并沒有被清除,只是被標記為刪除。此時,估計一般人的做法是刪除topic在zookeeper的信息和日志,其實這個操作并不會清除kafkabroker內存的topic數據。所以,此時最佳的策略是配置刪除參數為true然后,重啟kafka。

二,重要的類介紹

1,partitionstatemachine

該類代表分區的狀態機。決定者分區的當前狀態,和狀態轉移。四種狀態

  • nonexistentpartition
  • newpartition
  • onlinepartition
  • offlinepartition

2,replicamanager

負責管理當前機器的所有副本,處理讀寫、刪除等具體動作。

讀寫:寫獲取partition對象,再獲取replica對象,再獲取log對象,采用其管理的segment對象將數據寫入、讀出。

3,replicastatemachine

副本的狀態機。決定者副本的當前狀態和狀態之間的轉移。一個副本總共可以處于一下幾種狀態的一種
newreplica:crontroller在分區重分配的時候可以創建一個新的副本。只能接受變為follower的請求。前狀態可以是nonexistentreplica

onlinereplica:新啟動的分區,能接受變為leader或者follower請求。前狀態可以是newreplica, onlinereplica or offlinereplica

offlinereplica:死亡的副本處于這種狀態。前狀態可以是newreplica, onlinereplica

replicadeletionstarted:分本刪除開始的時候處于這種狀態,前狀態是offlinereplica

replicadeletionsuccessful:副本刪除成功。前狀態是replicadeletionstarted

replicadeletionineligible:刪除失敗的時候處于這種狀態。前狀態是replicadeletionstarted

nonexistentreplica:副本成功刪除之后處于這種狀態,前狀態是replicadeletionsuccessful

4,topicdeletionmanager

該類管理著topic刪除的狀態機

1),topiccommand通過創建/admin/delete_topics/<topic>,來發布topic刪除命令。

2),controller監聽/admin/delete_topic子節點變動,開始分別刪除topic

3),controller有個后臺線程負責刪除topic

三,源碼徹底解析topic的刪除過程

此處會分四個部分:

a),客戶端執行刪除命令作用

b),不配置delete.topic.enable整個流水的源碼

c),配置了delete.topic.enable整個流水的源碼

d),手動刪除zk上topic信息和磁盤數據

1,客戶端執行刪除命令

?
1
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

進入kafka-topics.sh我們會看到

?
1
exec $(dirname $0)/kafka-run-class.sh kafka.admin.topiccommand $@

進入topiccommand里面,main方法里面

?
1
2
else if(opts.options.has(opts.deleteopt))
 deletetopic(zkclient, opts)

實際內容是

?
1
2
3
4
5
6
7
val topics = gettopics(zkclient, opts)
if (topics.length == 0) {
 println("topic %s does not exist".format(opts.options.valueof(opts.topicopt)))
}
topics.foreach { topic =>
 try {
 zkutils.createpersistentpath(zkclient, zkutils.getdeletetopicpath(topic))

在"/admin/delete_topics"目錄下創建了一個topicname的節點。

2,假如不配置delete.topic.enable整個流水是

總共有兩處listener會響應:

a),topicchangelistener

b),deletetopicslistener

使用topic的刪除命令刪除一個topic的話,指揮觸發deletetopiclistener。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var topicstobedeleted = {
 import javaconversions._
 (children: buffer[string]).toset
}
val nonexistenttopics = topicstobedeleted.filter(t => !controllercontext.alltopics.contains(t))
topicstobedeleted --= nonexistenttopics
if(topicstobedeleted.size > 0) {
 info("starting topic deletion for topics " + topicstobedeleted.mkstring(","))
 // mark topic ineligible for deletion if other state changes are in progress
 topicstobedeleted.foreach { topic =>
 val preferredreplicaelectioninprogress =
  controllercontext.partitionsundergoingpreferredreplicaelection.map(_.topic).contains(topic)
 val partitionreassignmentinprogress =
  controllercontext.partitionsbeingreassigned.keyset.map(_.topic).contains(topic)
 if(preferredreplicaelectioninprogress || partitionreassignmentinprogress)
  controller.deletetopicmanager.marktopicineligiblefordeletion(set(topic))
 }
 // add topic to deletion list
 controller.deletetopicmanager.enqueuetopicsfordeletion(topicstobedeleted)
}

由于都會判斷delete.topic.enable是否為true,假如不為true就不會執行,為true就進入執行

?
1
2
controller.deletetopicmanager.marktopicineligiblefordeletion(set(topic))
controller.deletetopicmanager.enqueuetopicsfordeletion(topicstobedeleted)

3,delete.topic.enable配置為true

此處與步驟2的區別,就是那兩個處理函數。

?
1
2
controller.deletetopicmanager.marktopicineligiblefordeletion(set(topic))
controller.deletetopicmanager.enqueuetopicsfordeletion(topicstobedeleted)

marktopicineligiblefordeletion函數的處理為

?
1
2
3
4
5
6
if(isdeletetopicenabled) {
 val newtopicstohaltdeletion = topicstobedeleted & topics
 topicsineligiblefordeletion ++= newtopicstohaltdeletion
 if(newtopicstohaltdeletion.size > 0)
 info("halted deletion of topics %s".format(newtopicstohaltdeletion.mkstring(",")))
}

主要是停止刪除topic,假如存儲以下三種情況

* halt delete topic if -
* 1. replicas being down
* 2. partition reassignment in progress for some partitions of the topic
* 3. preferred replica election in progress for some partitions of the topic

enqueuetopicsfordeletion主要作用是更新刪除topic的集合,并激活topicdeletethread

?
1
2
3
4
5
6
7
def enqueuetopicsfordeletion(topics: set[string]) {
 if(isdeletetopicenabled) {
 topicstobedeleted ++= topics
 partitionstobedeleted ++= topics.flatmap(controllercontext.partitionsfortopic)
 resumetopicdeletionthread()
 }
}

在刪除線程deletetopicsthread的dowork方法中

?
1
2
3
4
5
6
7
topicsqueuedfordeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
 if(controller.replicastatemachine.areallreplicasfortopicdeleted(topic)) {
 // clear up all state for this topic from controller cache and zookeeper
 completedeletetopic(topic)
 info("deletion of topic %s successfully completed".format(topic))
 }

進入completedeletetopic方法中

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// deregister partition change listener on the deleted topic. this is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
partitionstatemachine.deregisterpartitionchangelistener(topic)
val replicasfordeletedtopic = controller.replicastatemachine.replicasinstate(topic, replicadeletionsuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicastatemachine.handlestatechanges(replicasfordeletedtopic, nonexistentreplica)
val partitionsfordeletedtopic = controllercontext.partitionsfortopic(topic)
// move respective partition to offlinepartition and nonexistentpartition state
partitionstatemachine.handlestatechanges(partitionsfordeletedtopic, offlinepartition)
partitionstatemachine.handlestatechanges(partitionsfordeletedtopic, nonexistentpartition)
topicstobedeleted -= topic
partitionstobedeleted.retain(_.topic != topic)
controllercontext.zkclient.deleterecursive(zkutils.gettopicpath(topic))
controllercontext.zkclient.deleterecursive(zkutils.gettopicconfigpath(topic))
controllercontext.zkclient.delete(zkutils.getdeletetopicpath(topic))
controllercontext.removetopic(topic)

主要作用是解除掉監控分區變動的listener,刪除zookeeper具體節點信息,刪除磁盤數據,更新內存數據結構,比如從副本狀態機里面移除分區的具體信息。

其實,最終要的是我們的副本磁盤數據是如何刪除的。我們重點介紹這個部分。

首次清除的話,在刪除線程deletetopicsthread的dowork方法中

?
1
2
3
4
5
6
7
8
{
 // if you come here, then no replica is in topicdeletionstarted and all replicas are not in
 // topicdeletionsuccessful. that means, that either given topic haven't initiated deletion
 // or there is at least one failed replica (which means topic deletion should be retried).
 if(controller.replicastatemachine.isanyreplicainstate(topic, replicadeletionineligible)) {
 // mark topic for deletion retry
 marktopicfordeletionretry(topic)
 }

進入marktopicfordeletionretry

?
1
2
3
4
val failedreplicas = controller.replicastatemachine.replicasinstate(topic, replicadeletionineligible)
info("retrying delete topic for topic %s since replicas %s were not successfully deleted"
 .format(topic, failedreplicas.mkstring(",")))
controller.replicastatemachine.handlestatechanges(failedreplicas, offlinereplica)

在replicastatemachine的handlestatechanges方法中,調用了handlestatechange,處理offlinereplica

?
1
2
// send stop replica command to the replica so that it stops fetching from the leader
brokerrequestbatch.addstopreplicarequestforbrokers(list(replicaid), topic, partition, deletepartition = false)

接著在handlestatechanges中

?
1
brokerrequestbatch.sendrequeststobrokers(controller.epoch, controllercontext.correlationid.getandincrement)

給副本數據存儲節點發送stopreplicakey副本指令,并開始刪除數據

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
stopreplicarequestmap foreach { case(broker, replicainfolist) =>
 val stopreplicawithdelete = replicainfolist.filter(p => p.deletepartition == true).map(i => i.replica).toset
 val stopreplicawithoutdelete = replicainfolist.filter(p => p.deletepartition == false).map(i => i.replica).toset
 debug("the stop replica request (delete = true) sent to broker %d is %s"
 .format(broker, stopreplicawithdelete.mkstring(",")))
 debug("the stop replica request (delete = false) sent to broker %d is %s"
 .format(broker, stopreplicawithoutdelete.mkstring(",")))
 replicainfolist.foreach { r =>
 val stopreplicarequest = new stopreplicarequest(r.deletepartition,
  set(topicandpartition(r.replica.topic, r.replica.partition)), controllerid, controllerepoch, correlationid)
 controller.sendrequest(broker, stopreplicarequest, r.callback)
 }
}
stopreplicarequestmap.clear()

broker的kafkaapis的handle方法在接受到指令后

?
1
case requestkeys.stopreplicakey => handlestopreplicarequest(request)
?
1
val (response, error) = replicamanager.stopreplicas(stopreplicarequest)

接著是在stopreplicas方法中

?
1
2
3
4
5
6
7
8
9
10
{
 controllerepoch = stopreplicarequest.controllerepoch
 // first stop fetchers for all partitions, then stop the corresponding replicas
 replicafetchermanager.removefetcherforpartitions(stopreplicarequest.partitions.map(r => topicandpartition(r.topic, r.partition)))
 for(topicandpartition <- stopreplicarequest.partitions){
 val errorcode = stopreplica(topicandpartition.topic, topicandpartition.partition, stopreplicarequest.deletepartitions)
 responsemap.put(topicandpartition, errorcode)
 }
 (responsemap, errormapping.noerror)
}

進一步進入stopreplica方法,正式進入日志刪除

?
1
2
3
4
5
6
7
getpartition(topic, partitionid) match {
 case some(partition) =>
 if(deletepartition) {
  val removedpartition = allpartitions.remove((topic, partitionid))
  if (removedpartition != null)
  removedpartition.delete() // this will delete the local log
 }

以上就是kafka的整個日志刪除流水。

4,手動刪除zk上topic信息和磁盤數據

topicchangelistener會監聽處理,但是處理很簡單,只是更新了

?
1
2
3
4
5
val deletedtopics = controllercontext.alltopics -- currentchildren
controllercontext.alltopics = currentchildren
 
val addedpartitionreplicaassignment = zkutils.getreplicaassignmentfortopics(zkclient, newtopics.toseq)
controllercontext.partitionreplicaassignment = controllercontext.partitionreplicaassignment.filter(p =>

四,總結

kafka的topic的刪除過程,實際上就是基于zookeeper做了一個訂閱發布系統。zookeeper的客戶端創建一個節點/admin/delete_topics/<topic>,由kafka controller監聽到事件之后正式觸發topic的刪除:解除partition變更監聽的listener,清除內存數據結構,刪除副本數據,刪除topic的相關zookeeper節點。

delete.topic.enable配置該參數為false的情況下執行了topic的刪除命令,實際上未做任何動作。我們此時要徹底刪除topic建議修改該參數為true,重啟kafka,這樣topic信息會被徹底刪除,已經測試。

一般流行的做法是手動刪除zookeeper的topic相關信息及磁盤數據但是這樣的話會造成部分內存數據未清除。至于是否會有隱患,未測試。

好了,以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。

原文鏈接:https://mp.weixin.qq.com/s/69NVkikYNdoyLyz12Z59_g

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 欧洲一级视频 | 日本在线视频一区二区 | 婷婷成人基地 | 91麻豆精品国产91久久久资源速度 | 国产精品美乳一区二区免费 | 国产精品免费观看 | 一区二区不卡视频 | 久久综合九色综合欧美狠狠 | 天天插天天操 | 亚洲欧洲精品成人久久奇米网 | 欧美视频精品 | 欧美日韩一区精品 | 91看片 | 国产日韩久久 | 欧美日韩精品免费 | 亚洲精品欧美 | 97精品国产| 欧美精品国产精品 | 国产精品视频网 | 曰批免费视频播放免费 | 国产毛片v一区二区三区 | 亚州国产 | 欧美日韩精品一区二区在线观看 | 日本一区二区三区免费观看 | 日本免费在线 | 色综合久久久久久久久久久 | 久久伊人国产 | 成人黄色电影在线观看 | 国产一级视频在线观看 | 中文字幕高清在线 | 亚洲喷水 | 国产高清一区 | 久久综合99| 国产精品午夜电影 | 精品国产乱码久久久久久牛牛 | 亚洲一区二区三区免费视频 | 日韩欧美一级片在线观看 | 国内成人免费视频 | 久久久久综合 | 国产香蕉视频在线播放 | 国产成人久久精品一区二区三区 |