由于筆者公司目前使用的kafka版本是2.2.1,故當下關于kafka的內核研究目前主要是基于該版本,當然該專欄還會繼續關注Kafka3.0。
我在使用kafka時發現客戶端可以不依賴Zookeeper的情況下完成消息發送、消息消費,眾所周知早期的Kafka,所有的元信息(topic、消費組、集群)等信息都存儲在Zookeeper中,原先的消息發送客戶端、消息消費客戶端都需要依賴Zookeeper。
溫馨提示:Kafka逐步開啟了去zookeeper化,到kafka2.8之前實現了消息發送者、消息消費者的去zookeeper化,從2.8版之后broker也支持去zookeeper。
那kafka2.2.1版本中,主題的路由信息、消費組信息分別是存儲在什么地方呢?消息發送端、消息消費端是如何感知的呢?
溫馨提示:如果大家對Kafka有基本的了解,不防停留片刻,稍作思考。
1.主題元數據存儲在Zookeeper中
進入到Kafka Broker連接的Zookeeper集群,我們不難發現在 /{namespace}/brokers/topics節點下存在該集群中所有的主題信息,展開某一個具體的主題,如下圖所示:
關于主題的元信息,其實主要包括如下信息:
- 分區數量 每一個具體topic下會有一個partitions節點,該節點下的每一個子節點代表一個分區。
- 分區狀態信息 每一個分區的的狀態由葉子節點 /{namespace}/brokers/topics/{topicName}/parttions/{partNO}/state表示,存儲的內容如下:
controller_epoch 控制器當前的選舉版本。
leader 該分區的Leader所在的Broker節點ID。
version 當前的存儲格式版本,默認為1。
leader_epoch 分區Leader的選舉版本。
isr 分區的ISR集合。
主題的路由信息是存儲在Zookeeper中,那為什么客戶端只需要Broker的地址,就可以獲取到主題的路由信息呢?
1.1 主題路由尋址
查找路由信息在Kafka2.1版本中是發送ApiKeys.METADATA請求,該請求的響應邏輯定義在Broker中,那客戶端是如何對Broker進行路由,Broker中的路由信息又是從何而來呢?
消息發送者首次發送METADATA定位Broker機制:首次發送請求會從KafkaProducer的bootstrap.servers中設置的broker列表中選擇當前最空閑的Broker,后續能感知所有的Broker。
消息消費者發送METADATA定位Broker機制:發送到當前消費組的組協調所在的Broker。
根據查閱KafkaApis的handleTopicMetadataRequest方法,進行一些ACL校驗后進入其核心方法:
關鍵點:
- 從MetadataCache中獲取topic到路由信息。
- 如果MetadataCache中不存在指定topic的路由信息,如果Broker允許自動創建主題(auto.create.topics.enable),默認為true,則自動創建該主題的信息,并將主題信息寫入到zookeeper,具體操作:
在/brokers/topics節點下創建子節點,子節點名稱為topic的名稱。
根據當前kafka分區的機架信息,分區數、副本數,broker節點數,進行分配,主要盡量將主分區不放在同一個機架、存儲在主題的節點信息中,例如{"version":1,"partitions":{"4":[2,0,1],"5":[0,1,2],"1":[2,1,0],"0":[1,0,2],"2":[0,2,1],"3":[1,2,0]}},其中key為分區名稱,值為副本所在的brokerId,其中排在第一位是傾向性Leader,主題中存儲的值是靜態數據,具體還會觸發選舉,選舉算法會參考這個分配。
控制器還會注冊調用registerPartitionModificationsHandlers方法,監聽主題信息的變化,從而觸發后續流程,啟動分區的真正創建(各個分區的Leader選舉等)。
溫馨提示:Kafka開啟自動創建主題,分區數量取自kafka broker中的num.partitions參數,默認為1,副本因子則取決于default.replication.factor參數,默認為1。
1.2 路由信息同步機制
MetadataCache,元信息緩存,那這里的數據又是從何而來呢?MetadataCache中路由信息的更新調用鏈如下圖所示:
Kafka的KafkaController(后續統稱控制器)首先會聽/brokers/topics/{topicName}節點內容的變化,一旦有新主題創建或主題信息變更,topic變更事件就會觸發,此時TopicChange的process方法會調用,最終調用updatePartitionReplicaAssignment,也就是一旦主題的信息發生變更,控制器會向所有Broker節點發送ApiKeys.UPDATE_METADATA,各個Broker在到該請求后,會更新各個Broker中的內存緩存,供消息發送者查找topic路由信息。
即Kafka2.2版本中,topic的元信息存儲在Zookeeper中,同時Kafka Controller會監聽zookeeper中相關節點,從而感知信息變更,從而將路由信息通過RPC發送到集群內所有的Broker中,故每一個Broker的內存中都存儲一份相同的路由信息。
Kafka2.8版本開始嘗試去Zookeeper化。
思考題:為什么各個Broker不都監聽zookeeper,從而感知topic變化,更新本地內存呢?歡迎各位留言討論或私信dingwpmz,共同交流。
2.消費組存儲在位點主題中
在較低版本中,啟動Kafka消費組需要指定zookeeper集群的地址,因為在低版本中消費組的元信息存儲在zookeeper中,具體路徑為/consumers,但后續版本中消費端的啟動已經不需指定zookeeper,而是指定broker的地址列表即可,那這個時候,消費組的信息是存儲在哪呢?
在前面介紹Kafka故障解決相關的文章中我們常常看到消費組組協調器,內部持有一個消費組元數據管理器GroupMetadataManager,相關的代碼截圖如下所示:
在GroupMetadataManager對象中持有一個Map結構的緩存,其鍵為消費組的名稱,值為GroupMetadata對象,內部記錄消費組的狀態,消費組的成員列表,位點信息。
內存的特點:訪問高效,但隨著Broker進程的退出而丟失,消費組存儲在內存中顯然不行,但又不在zookeeper中,那消費組的定義信息存儲在什么地方呢?
2.1消費組元信息存儲
消費組的定義信息存儲在系統主題__consumer_offsets中,什么,這個主題不是用來存儲消費位點的嗎?
原來__consumer_offsets不僅存儲消費組的位點信息,還存儲消費組的元信息,具體代碼入口:GroupMetadataManager#storeGroup,部分代碼截圖如下所示:
即消費組元信息當成一條消息寫入到__consumer_offsets,一條消費組元信息存儲的value值,由GroupMetadataManager的groupMetadataValue方法定義,具體代碼如下:
隨著Kafka的不斷演化,存儲格式進行了多次修改,對應的版本如下:
- V0:Kafka 0.10級以下版本
- V1:大于 0.10,低于等于2.1版本。
- V2:2.2版本及以后
消費組元信息存儲的格式為Json,具體存儲的內容:
- protocol_type 協議版本,取自AbstractCoordinator的抽象方法protocolType(),消費組的固定為:consumer。
- generation 消費組元信息的版本號,每發生一次消費組重平衡,該值會加一。
- protocol 協議內容,存儲消費組的隊列負載算法,在構建消費者時可通過partition.assignment.strategy參數傳遞,可以傳遞多個,消費組具體的負載算法會選擇每一個消費者都支持的協議進行隊列負載,默認的負載算法為RangeAssignor。
- leader 當前消費組的Leader,通常為第一個加入該消費組的消費者。
- current_state_timestamp 最新狀態變更的時間戳,該值是從V2版本開始引入。
- members 消費組的成員信息,每一個成員信息存儲的信息如下:
- member_id 成員id,客戶端id(clientId) + uuid。
client_id 客戶端ID。
client_host 客戶端ip地址。
rebalance_timeout 重平衡時間,默認為300000,5分鐘。
session_timeout 會話超時時間,默認為10s。
subscription 元信息,取自AbstractCoordinator的抽象方法metadata(),消費組的實現類為ConsumerCoordinator,主要是遍歷負載算法,每一個負載算法根據訂閱信息計算元信息。
assignment
各個消費者的隊列負載情況。
溫馨提示:GroupMetadataManager的storeGroup方法的調用時間是在消費組進行重平衡時,具體是重平衡第二階段(SYNC_GROUP)與完成重平衡。
2.2加載消息組元信息
消費組元信息是存儲在 __consumer_offsets主題中,在什么時候會從該主題中加載到內存中呢?
在__consumer_offsets的分區發生Leader選舉時會觸發將對應分區中的數據加載到內存,具體的處理入口在KafkaApis的handleLeaderAndIsrRequest方法,簡易調用鏈如下圖所示:
3.總結
本文主要介紹了Kafka 主題與消費組的持久化機制,在Kafka2.8版本開始,官方逐步去除對Zookeeper的依賴,那kafka3.x之后,又會是如何存儲消費組、主題的信息呢?
原文地址:https://mp.weixin.qq.com/s/ekMjAmvvCSzkFv7RVoO0GA