国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - java rocketmq--消息的產生(普通消息)

java rocketmq--消息的產生(普通消息)

2019-06-26 13:06有愛jj Java教程

這篇文章主要介紹了java rocketmq--消息的產生(普通消息),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,,需要的朋友可以參考下

前言

與消息發送緊密相關的幾行代碼:

1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

2. producer.start();

3. Message msg = new Message(...)

4. SendResult sendResult = producer.send(msg);

5. producer.shutdown();

那這幾行代碼執行時,背后都做了什么?

一. 首先是DefaultMQProducer.start 

@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}

調用了默認生成消息的實現類 -- DefaultMQProducerImpl

調用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()會初始化得到MQClientInstance實例對象,MQClientInstance實例對象調用它自己的start方法會 ,啟動一些服務,如拉去消息服務PullMessageService.Start()、啟動負載平衡服務RebalanceService.Start(),比如網絡通信服務MQClientAPIImpl.Start()

另外,還會執行與生產消息相關的信息,如注冊produceGroup、new一個TopicPublishInfo對象并以默認TopicKey為鍵值,構成鍵值對存入DefaultMQProducerImpl的topicPublishInfoTable中。

efaultMQProducerImpl.start()后,獲取的MQClientInstance實例對象會調用sendHeartbeatToAllBroker()方法,不斷向broker發送心跳包,yin'b可以使用下面一幅圖大致描述DefaultMQProducerImpl.start()過程:

java rocketmq--消息的產生(普通消息)

上圖中的三個部分中涉及的內容:

1.1 初始化MQClientInstance

一個客戶端只能產生一個MQClientInstance實例對象,產生方式使用了工廠模式與單例模式。MQClientInstance.start()方法啟動一些服務,源碼如下:

public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}

1.2 注冊producer

該過程會將這個當前producer對象注冊到MQClientInstance實例對象的的producerTable中。一個jvm(一個客戶端)中一個producerGroup只能有一個實例,MQClientInstance操作producerTable大概有如下幾個方法:

  • -- selectProducer
  • -- updateTopicRouteInfoFromNameServer
  • -- prepareHeartbeatData
  • -- isNeedUpdateTopicRouteInfo
  • -- shutdown

注:

根據不同的clientId,MQClientManager將給出不同的MQClientInstance;

根據不同的group,MQClientInstance將給出不同的MQProducer和MQConsumer

1.3 向路由信息表中添加路由

topicPublishInfoTable定義:

public class DefaultMQProducerImpl implements MQProducerInner {
private final Logger log = ClientLogger.getLog();
private final Random random = new Random();
private final DefaultMQProducer defaultMQProducer;
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();

它是一個以topic為key的Map型數據結構,DefaultMQProducerImpl.start()時會默認創建一個key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。

1.4 發送心跳包

MQClientInstance向broker發送心跳包時,調用sendHeartbeatToAllBroker( ),以及從MQClientInstance實例對象的brokerAddrTable中拿到所有broker地址,向這些broker發送心跳包。

sendHeartbeatToAllBroker會涉及到prepareHeartbeatData()方法,該方法會生成heartbeatData數據,發送心跳包時,heartbeatData作為心跳包的body。與producer相關的部分代碼如下:

// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}

二、. SendResult sendResult = producer.send(msg)

首先會調用DefaultMQProducer.send(msg) ,繼而調用sendDefaultImpl:

public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

sendDefaultImpl做了啥?

2.1. 獲取topicPublishInfo

根據msg的topic從topicPublishInfoTable獲取對應的topicPublishInfo,如果沒有則更新路由信息,從nameserver端拉取最新路由信息。從nameserver端拉取最新路由信息大致為:

首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。

java rocketmq--消息的產生(普通消息)

2.2 選擇消息發送的隊列

普通消息:默認方式下,selectOneMessageQueue從topicPublishInfo中的messageQueueList中選擇一個隊列(MessageQueue)進行發送消息,默認采用長輪詢的方式選擇隊列 。

它的機制如下:正常情況下,順序選擇queue進行發送;如果某一個節點發生了超時,則下次選擇queue時,跳過相同的broker。不同的隊列選擇策略形成了生產消息的幾種模式,如順序消息,事務消息。

順序消息:將一組需要有序消費的消息發往同一個broker的同一個隊列上即可實現順序消息,假設相同訂單號的支付,退款需要放到同一個隊列,那么就可以在send的時候,自己實現MessageQueueSelector,根據參數arg字段來選擇queue。

private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}

事務消息:只有在消息發送成功,并且本地操作執行成功時,才發送提交事務消息,做事務提交,消息發送失敗,直接發送回滾消息,進行回滾,具體如何實現后面會單獨成文分析。

2.3 封裝消息體通信包,發送數據包

首先,根據獲取的MessageQueue中的getBrokerName,調用findBrokerAddressInPublish得到該消息存放對應的broker地址,如果沒有找到則跟新路由信息,重新獲取地址 :

brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)

可知獲取的broker均為master(id=0)

然后, 將與該消息相關信息打包成RemotingCommand數據包,其RequestCode.SEND_MESSAGE

根據獲取的broke地址,將數據包到對應的broker,默認是發送超時時間為3s。

封裝消息請求包的包頭:

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);

發送消息包(普通消息默認為同步方式):

SendResult sendResult = null;
switch (communicationMode) {
   case SYNC:
  sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
  brokerAddr,
  mq.getBrokerName(),
   msg,
  requestHeader,
   timeout,
  communicationMode,
  context,
  this);
break;

處理來自broker端的響應數據包:

private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}

broker端處理request數據包后會將消息存儲到commitLog,具體過程后續分析。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

延伸 · 閱讀

精彩推薦
  • Java教程淺談Java(SpringBoot)基于zookeeper的分布式鎖實現

    淺談Java(SpringBoot)基于zookeeper的分布式鎖實現

    這篇文章主要介紹了Java(SpringBoot)基于zookeeper的分布式鎖實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的...

    LJY_SUPER5742021-07-21
  • Java教程Java之Springcloud Feign組件詳解

    Java之Springcloud Feign組件詳解

    這篇文章主要介紹了Java之Springcloud Feign組件詳解,本篇文章通過簡要的案例,講解了該項技術的了解與使用,以下就是詳細內容,需要的朋友可以參考下...

    深情以改10322021-11-12
  • Java教程JAVA中通過自定義注解進行數據驗證的方法

    JAVA中通過自定義注解進行數據驗證的方法

    java 自定義注解驗證可自己添加所需要的注解,下面這篇文章主要給大家介紹了關于JAVA中通過自定義注解進行數據驗證的相關資料,文中通過示例代碼介紹...

    Decouple6362021-05-25
  • Java教程Java list.remove( )方法注意事項

    Java list.remove( )方法注意事項

    這篇文章主要介紹了Java list.remove( )方法注意事項,非常簡單易懂,需要的朋友可以參考下...

    妖久9552021-05-25
  • Java教程java 中鎖的性能提高辦法

    java 中鎖的性能提高辦法

    這篇文章主要介紹了java 中鎖的性能提高辦法的相關資料,需要的朋友可以參考下...

    Java之家3092020-08-13
  • Java教程springboot ehcache 配置使用方法代碼詳解

    springboot ehcache 配置使用方法代碼詳解

    EhCache是一個比較成熟的Java緩存框架,Springboot對ehcache的使用非常支持,所以在Springboot中只需做些配置就可使用,且使用方式也簡易,今天給大家分享spri...

    m1719309529412912021-09-16
  • Java教程SpringBoot引入Thymeleaf的實現方法

    SpringBoot引入Thymeleaf的實現方法

    這篇文章主要介紹了SpringBoot引入Thymeleaf的實現方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下...

    Bobby6472021-07-28
  • Java教程JavaWeb 實現驗證碼功能(demo)

    JavaWeb 實現驗證碼功能(demo)

    在 WEB-APP 中一般應用于:登錄、注冊、買某票、秒殺等場景,大家都接觸過這個驗證碼操作,今天小編通過實例代碼給大家講解javaweb實現驗證碼功能,需要...

    java教程網12832020-08-05
主站蜘蛛池模板: 亚洲成人免费在线播放 | 国产精品久久久久久久久久免费看 | 精品国产一区二区三区在线观看 | 天天干天天操天天干 | 七七婷婷婷婷精品国产 | 不卡久久 | 亚洲网站在线 | 视频一区二区三区中文字幕 | 精品在线视频一区 | 欧美日韩不卡合集视频 | 桃色五月 | 欧美视频三区 | 91国内精品久久 | 日韩中文字幕av | 国产欧美一区二区三区在线看 | 亚洲精品一区二区三区蜜桃久 | 综合久久综合久久 | 福利视频网站 | 视频在线一区二区 | 日韩精品一区二区三区在线观看 | 婷婷综合在线 | 亚洲一区二区在线 | 911av视频 | 偷拍自拍网 | 成人黄色av | av在线电影网 | 污污视频免费网站 | 久久精品国产欧美亚洲人人爽 | 中文字幕日韩欧美 | 国产精品久久久久久a | 天堂√在线观看一区二区 | 久久精品一区二区三区四区 | 亚洲精品短视频 | 国产精品久久久久久久久久久免费看 | 91精品国产91久久久久久最新 | 欧美日韩一级电影 | porn在线视频 | 欧美精品亚洲精品 | 999久久久国产999久久久 | 国产精品伦理一区二区 | 亚洲一区视频在线 |