前言
apache kafka發源于linkedin,于2011年成為apache的孵化項目,隨后于2012年成為apache的主要項目之一。kafka使用scala和java進行編寫。apache kafka是一個快速、可擴展的、高吞吐、可容錯的分布式發布訂閱消息系統。kafka具有高吞吐量、內置分區、支持數據副本和容錯的特性,適合在大規模消息處理場景中使用。
本文依然是以kafka0.8.2.2為例講解
刪除一個topic有兩個關鍵點:
1,配置刪除參數
delete.topic.enable這個broker參數配置為true。
2,執行
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
假如不配置刪除參數為true的話,topic其實并沒有被清除,只是被標記為刪除。此時,估計一般人的做法是刪除topic在zookeeper的信息和日志,其實這個操作并不會清除kafkabroker內存的topic數據。所以,此時最佳的策略是配置刪除參數為true然后,重啟kafka。
二,重要的類介紹
1,partitionstatemachine
該類代表分區的狀態機。決定者分區的當前狀態,和狀態轉移。四種狀態
- nonexistentpartition
- newpartition
- onlinepartition
- offlinepartition
2,replicamanager
負責管理當前機器的所有副本,處理讀寫、刪除等具體動作。
讀寫:寫獲取partition對象,再獲取replica對象,再獲取log對象,采用其管理的segment對象將數據寫入、讀出。
3,replicastatemachine
副本的狀態機。決定者副本的當前狀態和狀態之間的轉移。一個副本總共可以處于一下幾種狀態的一種
newreplica:crontroller在分區重分配的時候可以創建一個新的副本。只能接受變為follower的請求。前狀態可以是nonexistentreplica
onlinereplica:新啟動的分區,能接受變為leader或者follower請求。前狀態可以是newreplica, onlinereplica or offlinereplica
offlinereplica:死亡的副本處于這種狀態。前狀態可以是newreplica, onlinereplica
replicadeletionstarted:分本刪除開始的時候處于這種狀態,前狀態是offlinereplica
replicadeletionsuccessful:副本刪除成功。前狀態是replicadeletionstarted
replicadeletionineligible:刪除失敗的時候處于這種狀態。前狀態是replicadeletionstarted
nonexistentreplica:副本成功刪除之后處于這種狀態,前狀態是replicadeletionsuccessful
4,topicdeletionmanager
該類管理著topic刪除的狀態機
1),topiccommand通過創建/admin/delete_topics/<topic>,來發布topic刪除命令。
2),controller監聽/admin/delete_topic子節點變動,開始分別刪除topic
3),controller有個后臺線程負責刪除topic
三,源碼徹底解析topic的刪除過程
此處會分四個部分:
a),客戶端執行刪除命令作用
b),不配置delete.topic.enable整個流水的源碼
c),配置了delete.topic.enable整個流水的源碼
d),手動刪除zk上topic信息和磁盤數據
1,客戶端執行刪除命令
1
|
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name |
進入kafka-topics.sh我們會看到
1
|
exec $(dirname $ 0 )/kafka-run- class .sh kafka.admin.topiccommand $@ |
進入topiccommand里面,main方法里面
1
2
|
else if (opts.options.has(opts.deleteopt)) deletetopic(zkclient, opts) |
實際內容是
1
2
3
4
5
6
7
|
val topics = gettopics(zkclient, opts) if (topics.length == 0 ) { println( "topic %s does not exist" .format(opts.options.valueof(opts.topicopt))) } topics.foreach { topic => try { zkutils.createpersistentpath(zkclient, zkutils.getdeletetopicpath(topic)) |
在"/admin/delete_topics"目錄下創建了一個topicname的節點。
2,假如不配置delete.topic.enable整個流水是
總共有兩處listener會響應:
a),topicchangelistener
b),deletetopicslistener
使用topic的刪除命令刪除一個topic的話,指揮觸發deletetopiclistener。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
var topicstobedeleted = { import javaconversions._ (children: buffer[string]).toset } val nonexistenttopics = topicstobedeleted.filter(t => !controllercontext.alltopics.contains(t)) topicstobedeleted --= nonexistenttopics if (topicstobedeleted.size > 0 ) { info( "starting topic deletion for topics " + topicstobedeleted.mkstring( "," )) // mark topic ineligible for deletion if other state changes are in progress topicstobedeleted.foreach { topic => val preferredreplicaelectioninprogress = controllercontext.partitionsundergoingpreferredreplicaelection.map(_.topic).contains(topic) val partitionreassignmentinprogress = controllercontext.partitionsbeingreassigned.keyset.map(_.topic).contains(topic) if (preferredreplicaelectioninprogress || partitionreassignmentinprogress) controller.deletetopicmanager.marktopicineligiblefordeletion(set(topic)) } // add topic to deletion list controller.deletetopicmanager.enqueuetopicsfordeletion(topicstobedeleted) } |
由于都會判斷delete.topic.enable是否為true,假如不為true就不會執行,為true就進入執行
1
2
|
controller.deletetopicmanager.marktopicineligiblefordeletion(set(topic)) controller.deletetopicmanager.enqueuetopicsfordeletion(topicstobedeleted) |
3,delete.topic.enable配置為true
此處與步驟2的區別,就是那兩個處理函數。
1
2
|
controller.deletetopicmanager.marktopicineligiblefordeletion(set(topic)) controller.deletetopicmanager.enqueuetopicsfordeletion(topicstobedeleted) |
marktopicineligiblefordeletion函數的處理為
1
2
3
4
5
6
|
if (isdeletetopicenabled) { val newtopicstohaltdeletion = topicstobedeleted & topics topicsineligiblefordeletion ++= newtopicstohaltdeletion if (newtopicstohaltdeletion.size > 0 ) info( "halted deletion of topics %s" .format(newtopicstohaltdeletion.mkstring( "," ))) } |
主要是停止刪除topic,假如存儲以下三種情況
* halt delete topic if -
* 1. replicas being down
* 2. partition reassignment in progress for some partitions of the topic
* 3. preferred replica election in progress for some partitions of the topic
enqueuetopicsfordeletion主要作用是更新刪除topic的集合,并激活topicdeletethread
1
2
3
4
5
6
7
|
def enqueuetopicsfordeletion(topics: set[string]) { if (isdeletetopicenabled) { topicstobedeleted ++= topics partitionstobedeleted ++= topics.flatmap(controllercontext.partitionsfortopic) resumetopicdeletionthread() } } |
在刪除線程deletetopicsthread的dowork方法中
1
2
3
4
5
6
7
|
topicsqueuedfordeletion.foreach { topic => // if all replicas are marked as deleted successfully, then topic deletion is done if (controller.replicastatemachine.areallreplicasfortopicdeleted(topic)) { // clear up all state for this topic from controller cache and zookeeper completedeletetopic(topic) info( "deletion of topic %s successfully completed" .format(topic)) } |
進入completedeletetopic方法中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// deregister partition change listener on the deleted topic. this is to prevent the partition change listener // firing before the new topic listener when a deleted topic gets auto created partitionstatemachine.deregisterpartitionchangelistener(topic) val replicasfordeletedtopic = controller.replicastatemachine.replicasinstate(topic, replicadeletionsuccessful) // controller will remove this replica from the state machine as well as its partition assignment cache replicastatemachine.handlestatechanges(replicasfordeletedtopic, nonexistentreplica) val partitionsfordeletedtopic = controllercontext.partitionsfortopic(topic) // move respective partition to offlinepartition and nonexistentpartition state partitionstatemachine.handlestatechanges(partitionsfordeletedtopic, offlinepartition) partitionstatemachine.handlestatechanges(partitionsfordeletedtopic, nonexistentpartition) topicstobedeleted -= topic partitionstobedeleted.retain(_.topic != topic) controllercontext.zkclient.deleterecursive(zkutils.gettopicpath(topic)) controllercontext.zkclient.deleterecursive(zkutils.gettopicconfigpath(topic)) controllercontext.zkclient.delete(zkutils.getdeletetopicpath(topic)) controllercontext.removetopic(topic) |
主要作用是解除掉監控分區變動的listener,刪除zookeeper具體節點信息,刪除磁盤數據,更新內存數據結構,比如從副本狀態機里面移除分區的具體信息。
其實,最終要的是我們的副本磁盤數據是如何刪除的。我們重點介紹這個部分。
首次清除的話,在刪除線程deletetopicsthread的dowork方法中
1
2
3
4
5
6
7
8
|
{ // if you come here, then no replica is in topicdeletionstarted and all replicas are not in // topicdeletionsuccessful. that means, that either given topic haven't initiated deletion // or there is at least one failed replica (which means topic deletion should be retried). if (controller.replicastatemachine.isanyreplicainstate(topic, replicadeletionineligible)) { // mark topic for deletion retry marktopicfordeletionretry(topic) } |
進入marktopicfordeletionretry
1
2
3
4
|
val failedreplicas = controller.replicastatemachine.replicasinstate(topic, replicadeletionineligible) info( "retrying delete topic for topic %s since replicas %s were not successfully deleted" .format(topic, failedreplicas.mkstring( "," ))) controller.replicastatemachine.handlestatechanges(failedreplicas, offlinereplica) |
在replicastatemachine的handlestatechanges方法中,調用了handlestatechange,處理offlinereplica
1
2
|
// send stop replica command to the replica so that it stops fetching from the leader brokerrequestbatch.addstopreplicarequestforbrokers(list(replicaid), topic, partition, deletepartition = false ) |
接著在handlestatechanges中
1
|
brokerrequestbatch.sendrequeststobrokers(controller.epoch, controllercontext.correlationid.getandincrement) |
給副本數據存儲節點發送stopreplicakey副本指令,并開始刪除數據
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
stopreplicarequestmap foreach { case (broker, replicainfolist) => val stopreplicawithdelete = replicainfolist.filter(p => p.deletepartition == true ).map(i => i.replica).toset val stopreplicawithoutdelete = replicainfolist.filter(p => p.deletepartition == false ).map(i => i.replica).toset debug( "the stop replica request (delete = true) sent to broker %d is %s" .format(broker, stopreplicawithdelete.mkstring( "," ))) debug( "the stop replica request (delete = false) sent to broker %d is %s" .format(broker, stopreplicawithoutdelete.mkstring( "," ))) replicainfolist.foreach { r => val stopreplicarequest = new stopreplicarequest(r.deletepartition, set(topicandpartition(r.replica.topic, r.replica.partition)), controllerid, controllerepoch, correlationid) controller.sendrequest(broker, stopreplicarequest, r.callback) } } stopreplicarequestmap.clear() |
broker的kafkaapis的handle方法在接受到指令后
1
|
case requestkeys.stopreplicakey => handlestopreplicarequest(request) |
1
|
val (response, error) = replicamanager.stopreplicas(stopreplicarequest) |
接著是在stopreplicas方法中
1
2
3
4
5
6
7
8
9
10
|
{ controllerepoch = stopreplicarequest.controllerepoch // first stop fetchers for all partitions, then stop the corresponding replicas replicafetchermanager.removefetcherforpartitions(stopreplicarequest.partitions.map(r => topicandpartition(r.topic, r.partition))) for (topicandpartition <- stopreplicarequest.partitions){ val errorcode = stopreplica(topicandpartition.topic, topicandpartition.partition, stopreplicarequest.deletepartitions) responsemap.put(topicandpartition, errorcode) } (responsemap, errormapping.noerror) } |
進一步進入stopreplica方法,正式進入日志刪除
1
2
3
4
5
6
7
|
getpartition(topic, partitionid) match { case some(partition) => if (deletepartition) { val removedpartition = allpartitions.remove((topic, partitionid)) if (removedpartition != null ) removedpartition.delete() // this will delete the local log } |
以上就是kafka的整個日志刪除流水。
4,手動刪除zk上topic信息和磁盤數據
topicchangelistener會監聽處理,但是處理很簡單,只是更新了
1
2
3
4
5
|
val deletedtopics = controllercontext.alltopics -- currentchildren controllercontext.alltopics = currentchildren val addedpartitionreplicaassignment = zkutils.getreplicaassignmentfortopics(zkclient, newtopics.toseq) controllercontext.partitionreplicaassignment = controllercontext.partitionreplicaassignment.filter(p => |
四,總結
kafka的topic的刪除過程,實際上就是基于zookeeper做了一個訂閱發布系統。zookeeper的客戶端創建一個節點/admin/delete_topics/<topic>,由kafka controller監聽到事件之后正式觸發topic的刪除:解除partition變更監聽的listener,清除內存數據結構,刪除副本數據,刪除topic的相關zookeeper節點。
delete.topic.enable配置該參數為false的情況下執行了topic的刪除命令,實際上未做任何動作。我們此時要徹底刪除topic建議修改該參數為true,重啟kafka,這樣topic信息會被徹底刪除,已經測試。
一般流行的做法是手動刪除zookeeper的topic相關信息及磁盤數據但是這樣的話會造成部分內存數據未清除。至于是否會有隱患,未測試。
好了,以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。
原文鏈接:https://mp.weixin.qq.com/s/69NVkikYNdoyLyz12Z59_g