国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 使用RocketMQTemplate發送帶tags的消息

使用RocketMQTemplate發送帶tags的消息

2021-09-24 11:51wgslucky Java教程

這篇文章主要介紹了使用RocketMQTemplate發送帶tags的消息,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

RocketMQTemplate發送帶tags的消息

RocketMQTemplate是RocketMQ集成到Spring cloud之后提供的個方便發送消息的模板類,它是基本Spring 的消息機制實現的,對外只提供了Spring抽象出來的消息發送接口。

在單獨使用RocketMQ的時候,發送消息使用的Message是‘org.apache.rocketmq.common.message'包下面的Message,而使用RocketMQTemplate發送消息時,使用的Message是org.springframework.messaging的Message,猛一看,沒辦法發送帶tags的消息了,其實在RocketMQ集成的時候已經解決了這個問題。

在RocketMQTemplate發送消息時,調用的方法是:

  1. public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {
  2. if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
  3. log.error("syncSendOrderly failed. destination:{}, message is null ", destination);
  4. throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
  5. }
  6. try {
  7. long now = System.currentTimeMillis();
  8. //在這里對消息進行了轉化,將Spring的message轉化為rocketmq自己的message
  9. org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,
  10. charset, destination, message);
  11. SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);
  12. long costTime = System.currentTimeMillis() - now;
  13. log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
  14. return sendResult;
  15. } catch (Exception e) {
  16. log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);
  17. throw new MessagingException(e.getMessage(), e);
  18. }
  19. }

在上面的代碼中,對消息進行了轉化,將Spring的message轉化為rocketmq自己的message,在RocketMQUtil.convertToRocketMessage方法中有個地方就是獲取tags的:

  1. String[] tempArr = destination.split(":", 2);
  2. String topic = tempArr[0];
  3. String tags = "";
  4. if (tempArr.length > 1) {
  5. tags = tempArr[1];
  6. }

所以,在發送消息的時候,我們只要把tags使用":"添加到topic后面就可以了。

例如:xxxx:tag1 || tag2 || tag3

使用RocketMQ 處理消息

消息發送(生產者)

以maven + SpringBoot 工程為例,先在pom.xml增加依賴

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.0.1</version>
  5. </dependency>

由于,這個依賴是一個starter,直接引入依賴就可以開始寫投遞消息的代碼了。這個starter注冊了一個叫org.apache.rocketmq.spring.core.RocketMQTemplate的bean,用它就可以直接把消息投遞出去。 具體的API是這樣的

  1. XXXEvent xxxDto = new XXXEvent();
  2. Message<XXXEvent> message = MessageBuilder.withPayload(xxxDto).build();
  3. String dest = String.format("%s:%s",topic-name","tag-name");
  4. //默認投遞:同步發送 不會丟失消息。如果在投遞成功后發生網絡異常,客戶端會認為投遞失敗而回滾本地事務
  5. this.rocketMQTemplate.send(dest, xxxDto);

這種投遞方式能保證投遞成功的消息不會丟失,但是不能保證投遞一定成功。假設一次調用的流程是這樣的

使用RocketMQTemplate發送帶tags的消息

如果在步驟3的時候發生錯誤,因為出錯mqClient會認為消息投遞失敗而把事務回滾。如果消息已經被消費,那就會導致業務錯誤。我們可以用事務消息解決這個問題。

以帶事務方式投遞的消息,正常情況下的處理流程是這樣的

使用RocketMQTemplate發送帶tags的消息

出錯的時候是這樣的

使用RocketMQTemplate發送帶tags的消息

由于普通消息沒有消息回查,普通消息用的producer不支持回查操作,不同業務的回查處理也不一樣,事務消息需要使用單獨的producer。消息發送代碼大概是這樣的

  1. //調用這段代碼之前別做會影響數據的操作
  2. XXXEvent xxxDto = new XXXEvent();
  3. Message<XXXEvent> message = MessageBuilder.withPayload(xxxDto).build();
  4. String dest = String.format("%s:%s",topic-name","tag-name");
  5. TransactionSendResult transactionSendResult = this.rocketMQTemplate.sendMessageInTransaction("poducer-name","topic-name:tag-name",message,"xxxid");
  6. if (LocalTransactionState.ROLLBACK_MESSAGE.equals(transactionSendResult.getLocalTransactionState())){
  7. throw new RuntimeException("事務消息投遞失敗");
  8. }
  9. //按照RocketMQ的寫法,這個地方不應該有別的代碼
  1. @RocketMQTransactionListener(txProducerGroup = "producer")
  2. class TransactionListenerImpl implements RocketMQLocalTransactionListener {
  3.  
  4. //消息投遞成功后執行的邏輯(半消息)
  5. //原文:When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
  6. @Override
  7. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  8. try{
  9. //
  10. xxxService.doSomething();
  11. return RocketMQLocalTransactionState.COMMIT;
  12. catch(IOException e){
  13. //不確定最終是否成功
  14. return RocketMQLocalTransactionState.UNKNOWN;
  15. }catch(Exception e){
  16. return RocketMQLocalTransactionState.ROLLBACK;
  17. }
  18. }
  19. //回查事務執行狀態
  20. @Override
  21. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  22. Boolean result = xxxService.isSuccess(msg,arg);
  23. if(result != null){
  24. if(result){
  25. return RocketMQLocalTransactionState.COMMIT;
  26. }else{
  27. return RocketMQLocalTransactionState.ROLLBACK;
  28. }
  29. }
  30. return RocketMQLocalTransactionState.UNKNOWN;
  31. }
  32. }

處理消息(消費)

普通消息和事務消息的區別只在投遞的時候才明顯,對應的消費端代碼比較簡單

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  3. import org.apache.rocketmq.spring.core.RocketMQListener;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.data.redis.core.RedisTemplate;
  6. import org.springframework.data.redis.core.StringRedisTemplate;
  7. import org.springframework.stereotype.Component;
  8. @Slf4j
  9. @Component
  10. @RocketMQMessageListener(consumerGroup = "xxx-consumer", topic = "topic-name",selectorExpression = "tag-name")
  11. public class XXXEventMQListener implements RocketMQListener<XXXEvent> {
  12. private String repeatCheckRedisKeyTemplate = "topic-name:tag:repeat-check:%s";
  13. @Autowired private StringRedisTemplate redisTemplate;
  14. @Override
  15. public void onMessage(XXXEvent message) {
  16. log.info("consumer message {}",message);
  17. //處理消息
  18. try{
  19. xxxService.doSomething(message);
  20. }catch(Exception ex){
  21. log.warn(String.format("message [%s] 消費失敗",message),ex);
  22. //拋出異常后,MQClient會返回ConsumeConcurrentlyStatus.RECONSUME_LATER,這條消息會再次嘗試消費
  23. throw new RuntimException(ex);
  24. }
  25. }
  26. }

RocketMQ用ACK機制保證NameServer知道消息是否被消費在

org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer里是這么處理的

  1. public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
  2. @SuppressWarnings("unchecked")
  3. @Override
  4. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  5. for (MessageExt messageExt : msgs) {
  6. log.debug("received msg: {}", messageExt);
  7. try {
  8. long now = System.currentTimeMillis();
  9. rocketMQListener.onMessage(doConvertMessage(messageExt));
  10. long costTime = System.currentTimeMillis() - now;
  11. log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
  12. } catch (Exception e) {
  13. log.warn("consume message failed. messageExt:{}", messageExt, e);
  14. context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
  15. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  16. }
  17. }
  18. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  19. }
  20. }

以上為個人經驗,希望能給大家一個參考,也希望大家多多支持我們。

原文鏈接:https://blog.csdn.net/youxijishu/article/details/105042136

延伸 · 閱讀

精彩推薦
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程xml與Java對象的轉換詳解

    xml與Java對象的轉換詳解

    這篇文章主要介紹了xml與Java對象的轉換詳解的相關資料,需要的朋友可以參考下...

    Java教程網2942020-09-17
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發現了對于集合操作轉換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關于Java8中S...

    阿杜7472021-02-04
  • Java教程Java實現搶紅包功能

    Java實現搶紅包功能

    這篇文章主要為大家詳細介紹了Java實現搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經有好久沒有升過級了。升級完畢重啟之后,突然發現好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關于小米推送Java代碼,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩中求8032021-07-12
  • Java教程Java BufferWriter寫文件寫不進去或缺失數據的解決

    Java BufferWriter寫文件寫不進去或缺失數據的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數據的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
主站蜘蛛池模板: 一本久久综合亚洲鲁鲁五月天 | 伊人中文| 色婷婷综合久久久中字幕精品久久 | 可以在线观看的黄色 | 久久久亚洲 | 99在线热视频 | 精品视频一区二区三区 | 精品视频在线一区 | 免费午夜视频 | 国产一级黄色大片 | 亚洲va国产天堂va久久 en | 激情视频综合网 | 在线色网站 | 久久精选| 亚洲精品乱码8久久久久久日本 | 精品一区二区电影 | 久久久久久久一区 | 国产在线精品一区 | 91视频观看 | 日韩视频一区二区三区 | 91嫩草国产露脸精品国产 | 国产欧美综合一区二区三区 | 欧美在线观看免费观看视频 | 欧美精品在欧美一区二区少妇 | 在线播放国产一区二区三区 | aaa欧美大片 | 国产成人一区二区三区 | 少妇精品久久久久久久久久 | 亚洲成人久久久 | 国产成人在线一区 | 亚洲三级在线 | www.伊人网 | 亚洲精品久久久久中文字幕欢迎你 | 色av综合| 亚洲综合视频 | 久久精品国产99 | 成人欧美一区二区三区白人 | 国产精品久久久久久久久久久久久 | 欧美在线视频一区 | 欧美自拍网| 日韩精品在线一区二区 |