国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - rocketmq消費負載均衡--push消費詳解

rocketmq消費負載均衡--push消費詳解

2019-06-25 19:49有愛jj Java教程

這篇文章主要介紹了rocketmq消費負載均衡--push消費詳解,本文介紹了DefaultMQPushConsumerImpl消費者,客戶端負載均衡相關知識點。,需要的朋友可以參考下

前言

本文介紹了DefaultMQPushConsumerImpl消費者,客戶端負載均衡相關知識點。本文從DefaultMQPushConsumerImpl啟動過程到實現(xiàn)負載均衡,從源代碼一步一步分析,共分為6個部分進行介紹,其中第6個部分 rebalanceByTopic 為負載均衡的核心邏輯模塊,具體過程運用了圖文進行闡述。

介紹之前首先拋出幾個問題:

1. 要做負載均衡,首先要解決的一個問題是什么?

2. 負載均衡是Client端處理還是Broker端處理?

個人理解:

1. 要做負載均衡,首先要做的就是信號收集。

所謂信號收集,就是得知道每一個consumerGroup有哪些consumer,對應的topic是誰。信號收集分為Client端信號收集與Broker端信號收集兩個部分。

2. 負載均衡放在Client端處理。

具體做法是:消費者客戶端在啟動時完善rebalanceImpl實例,同時拷貝訂閱信息存放rebalanceImpl實例對象中,另外也是很重要的一個步驟 -- 通過心跳消息,不停的上報自己到所有Broker,注冊RegisterConsumer,等待上述過程準備好之后在Client端不斷執(zhí)行的負載均衡服務線程從Broker端獲取一份全局信息(該consumerGroup下所有的消費Client),然后分配這些全局信息,獲取當前客戶端分配到的消費隊列。

本文具體的內(nèi)容:

I. copySubscription

Client端信號收集,拷貝訂閱信息。

在DefaultMQPushConsumerImpl.start()時,會將消費者的topic訂閱關系設置到rebalanceImpl的SubscriptionInner的map中用于負載:

private void copySubscription() throws MQClientException {
try {
//注:一個consumer對象可以訂閱多個topic
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

FilterAPI.buildSubscriptionData接口將訂閱關系轉換為SubscriptionData 數(shù)據(jù),其中subString包含訂閱tag等信息。另外,如果該消費者的消費模式為集群消費,則會將retry的topic一并放到。

II. 完善rebalanceImpl實例

Client繼續(xù)收集信息:

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer
.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

本文以DefaultMQPushConsumerImpl為例,因此this對象類型為DefaultMQPushConsumerImp。

III. this.rebalanceService.start()

開啟負載均衡服務。this.rebalanceService是一個RebalanceService實例對象,它繼承與ServiceThread,是一個線程類。 this.rebalanceService.start()執(zhí)行時,也即執(zhí)行RebalanceService線程體:

@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
this.waitForRunning(WaitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}

IV. this.mqClientFactory.doRebalance

客戶端遍歷消費組table,對該客戶端上所有消費者獨立進行負載均衡,分發(fā)消費隊列:

public void doRebalance() {
for (String group : this.consumerTable.keySet()) {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null) {
try {
impl.doRebalance();
} catch (Exception e) {
log.error("doRebalance exception", e);
}
}
}
}

V. MQConsumerInner.doRebalance

由于本文以DefaultMQPushConsumerImpl消費過程為例,即DefaultMQPushConsumerImpl.doRebalance:

@Override
public void doRebalance() {
if (this.rebalanceImpl != null) {
this.rebalanceImpl.doRebalance();
}
}

步驟II 中完善了rebalanceImpl實例,為調(diào)用rebalanceImpl.doRebalance()提供了初始數(shù)據(jù)。

rebalanceImpl.doRebalance()過程如下:

public void doRebalance() {
     // 前文copySubscription中初始化了SubscriptionInner
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic);
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}

VI. rebalanceByTopic -- 核心步驟之一

rebalanceByTopic方法中根據(jù)消費者的消費類型為BROADCASTING或CLUSTERING做不同的邏輯處理。CLUSTERING邏輯包括BROADCASTING邏輯,本部分只介紹集群消費負載均衡的邏輯。

集群消費負載均衡邏輯主要代碼如下(省略了log等代碼):

//1.從topicSubscribeInfoTable列表中獲取與該topic相關的所有消息隊列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//2. 從broker端獲取消費該消費組的所有客戶端clientId
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
f (null == mqSet) { ... }
if (null == cidAll) { ... }
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);

     // 3.創(chuàng)建DefaultMQPushConsumer對象時默認設置為AllocateMessageQueueAveragely
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List<MessageQueue> allocateResult = null;
try {
         // 4.調(diào)用AllocateMessageQueueAveragely.allocate方法,獲取當前client分配消費隊列
allocateResult = strategy.allocate(
this.consumerGroup, 
this.mQClientFactory.getClientId(), 
mqAll,
cidAll);
} catch (Throwable e) {
return;
}
    // 5. 將分配得到的allocateResult 中的隊列放入allocateResultSet 集合
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
、
     //6. 更新updateProcessQueue
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);
if (changed) {
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}

注:BROADCASTING邏輯只包含上述的1、6。

集群消費負載均衡邏輯中的1、2、4這三個點相關知識為其核心過程,各個點相關知識如下:

第1點:從topicSubscribeInfoTable列表中獲取與該topic相關的所有消息隊列

rocketmq消費負載均衡--push消費詳解

第2點: 從broker端獲取消費該消費組的所有客戶端clientId

首先,消費者對象不斷地向所有broker發(fā)送心跳包,上報自己,注冊并更新訂閱關系以及客戶端ChannelInfoTable;之后,客戶端在做消費負載均衡時獲取那些消費客戶端,對這些客戶端進行負載均衡,分發(fā)消費的隊列。具體過程如下圖所示:

rocketmq消費負載均衡--push消費詳解

第4點:調(diào)用AllocateMessageQueueAveragely.allocate方法,獲取當前client分配消費隊列

rocketmq消費負載均衡--push消費詳解

注:上圖中cId1、cId2、...、cIdN通過 getConsumerIdListByGroup 獲取,它們在這個ConsumerGroup下所有在線客戶端列表中。

當前消費對進行負載均衡策略后獲取對應的消息消費隊列。具體的算法很簡單,可以看源碼。

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 久色视频在线观看 | 希岛爱理一区二区三区av高清 | 日韩一区二区三区电影在线观看 | 欧美日韩精品一区二区三区蜜桃 | 日韩成人av电影 | 亚洲免费婷婷 | 自拍 亚洲 欧美 老师 丝袜 | 91精品国产乱码久久久久久 | 黄版视频在线观看 | 日本一区二区三区四区 | 日本久久精品 | 国产精品99精品久久免费 | 色屁屁影院 | 国产精品免费av | 黄色网页在线观看 | 不卡一区二区av | 九九国产| 成人免费一区二区三区视频网站 | 国产一级一级特黄女人精品毛片 | 免费一级毛片免费播放 | 日韩在线视频观看 | 国产成人av在线播放 | 国产欧美日韩视频 | 欧美日韩一二三区 | 久久亚洲国产 | 精品一区二区三区在线观看 | 免费观看黄色 | 精品在线一区二区三区 | 成人精品一区二区三区 | 国产最新精品 | 欧美日韩在线免费观看 | 亚洲精品一区久久久久久 | 国产乱码精品一区二区三 | 午夜色福利 | 中文字幕1区2区3区 日韩免费高清视频 | 久久伊人成人网 | 在线黄色网 | 高清视频一区 | 欧美激情视频一区二区三区在线播放 | 亚洲精品久久久久久久久久久 | 操少妇逼视频 |