国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

香港云服务器
服務(wù)器之家 - 編程語言 - Java教程 - springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi)的方法

springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi)的方法

2022-03-08 00:52buguge Java教程

當(dāng)項(xiàng)目部署到測試環(huán)境后,QA測試過程中,總是“莫名其妙”的發(fā)現(xiàn)所保存的用戶付款單數(shù)據(jù)有問題。這篇文章主要介紹了springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi),需要的朋友可以參考下

如何保證mq隊(duì)列里的消息只被測試服務(wù)器上的consumer消費(fèi),避免本地環(huán)境誤消費(fèi)?

程序里有一個應(yīng)用場景使用到了rabbitmq――當(dāng)財(cái)務(wù)確認(rèn)收到企業(yè)的打款金額后,系統(tǒng)會把企業(yè)訂單生成用戶付款單。由于訂單記錄數(shù)據(jù)量大,改為通過mq來異步實(shí)現(xiàn)。即財(cái)務(wù)確認(rèn)收款操作后,將企業(yè)訂單數(shù)據(jù)放入mq,另一端監(jiān)聽mq消息隊(duì)列,將收到的企業(yè)訂單加工轉(zhuǎn)換成用戶付款單,并做持久化。

springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi)的方法

本地開發(fā)環(huán)境與測試環(huán)境共用一套rabbitmq。當(dāng)項(xiàng)目部署到測試環(huán)境后,QA測試過程中,總是“莫名其妙”的發(fā)現(xiàn)所保存的用戶付款單數(shù)據(jù)有問題。

當(dāng)然,首先要排查程序,檢查Consumer的數(shù)據(jù)處理的邏輯是否有bug。單元測試后,發(fā)現(xiàn)并不存在測試環(huán)境的bug。

原來,消息隊(duì)列被“非正常”消費(fèi)了!

Q: 什么情況?

A: 幾個伙伴一起參與的項(xiàng)目,大家總是要調(diào)試自己的程序的。而如果碰巧本地程序監(jiān)聽到消息隊(duì)列里有消息,那么,消息就被本地程序消費(fèi)掉了。問題正是出現(xiàn)在這里!――――團(tuán)隊(duì)開發(fā),大家并不會及時檢出git上最新的程序版本。如果本地的程序版本不是最新的正確的版本,勢必會出現(xiàn)bug。

那么,怎么辦?

每次你改了邏輯,告訴大家獲取最新?

不現(xiàn)實(shí),約定的東西往往不奏效的。

如何保證mq隊(duì)列里的消息只被測試服務(wù)器上的consumer消費(fèi),避免本地環(huán)境誤消費(fèi)? 或者說,如何實(shí)現(xiàn)消息的定向消費(fèi)呢?

只要肯琢磨,辦法總比困難多!百思可得解!

我們知道,rabbitmq手動ack模式。這還不夠,因?yàn)槲覀冊趺醋宑onsumer來決定是否消費(fèi)呢? 所以,我們需要一個標(biāo)識――――producer設(shè)定一個標(biāo)識,consumer如果匹配這個標(biāo)識,則消費(fèi),否則予以reject放回消息隊(duì)列。

springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi)的方法

通過查看spring-rabbit/spring-amqp的代碼,發(fā)現(xiàn)可以在spring-amqp里的MessageProperties上做文章。生產(chǎn)者與消費(fèi)者每次消息傳輸都會攜帶一個MessageProperties,通常我們是不指定的,走M(jìn)essageProperties的默認(rèn)設(shè)置值。

我的策略:MessageProperties有一個屬性叫AppId。我們程序所部署的測試機(jī)器就一臺,即消息Producer和消息Consumer在一臺機(jī)器上。那么,我就可以利用機(jī)器的IP來識別消息。只有Producer與Consumer的IP匹配,才消費(fèi)消息。程序員本機(jī)IP與測試服務(wù)器IP不一樣,就會拒絕接收消息,會把消息重新放回消息隊(duì)列,等待測試服務(wù)器的Consumer消費(fèi)。

話不多說,上代碼吧,

生產(chǎn)者代碼:

package com.sboot.mq;

import org.junit.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import java.net.InetAddress;
import java.util.UUID;

public class MQProducerTest extends BaseTest {
  @Autowired
  RabbitTemplate rabbitTemplate;

  @Test
  public void test() throws Exception {
      for (int i = 1; i <= 5; i++) {
          MessageProperties messageProperties = new MessageProperties();
          String ip = InetAddress.getLocalHost().getHostAddress();
          messageProperties.setAppId(ip);
//            messageProperties.setUserId(String.valueOf(i));
          MessageConverter messageConverter = new SimpleMessageConverter();
          String msg = UUID.randomUUID().toString();
//            System.out.println(msg);
          Message message1 = messageConverter.toMessage(msg, messageProperties);
          rabbitTemplate.send(MessageQueueConstant.USER_SETTLEMENT_EXCHANGE, "UserSettlementRouting", message1);
          System.out.println("入隊(duì)完成");
          Thread.sleep(500L);
      }
  }
}

消費(fèi)者手動ACK,要實(shí)現(xiàn)ChannelAwareMessageListener接口,感知rabbitmq.client.Channel實(shí)例,調(diào)用channel的basicAck、basicReject等方法:

package com.sboot.mq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import java.net.InetAddress;

@Component
@Profile(value = "dev")
@Slf4j
public class UserSettlementDevConsumer implements ChannelAwareMessageListener {

  @RabbitHandler
  @RabbitListener(queues = MessageQueueConstant.USER_SETTLEMENT_QUEUE, ackMode = "MANUAL")
  @Override
  public void onMessage(Message message, Channel channel) throws Exception {
      Thread.currentThread().setName(UserSettlementDevConsumer.class.getSimpleName() + System.currentTimeMillis());

      long tag = message.getMessageProperties().getDeliveryTag();
      String appId = message.getMessageProperties().getAppId();
      log.info("{}-{}, 消息出隊(duì)", tag, appId);
      String receiveMsg = "";
      try {
          //核對標(biāo)識,決定是否消費(fèi)消息
          String ip = InetAddress.getLocalHost().getHostAddress();
          if (!ip.equals(appId)) {
              log.info("這不是我需要的消息。放回隊(duì)列。{}", receiveMsg);
//                channel.basicNack(tag, false, true);
              channel.basicReject(tag, true);
//                channel.basicRecover(true);
              return;
          }

          MessageConverter messageConverter = new SimpleMessageConverter();
          receiveMsg = String.valueOf(messageConverter.fromMessage(message));
          。。。。在這里消費(fèi)消息
          log.info("success " + receiveMsg);
          channel.basicAck(tag, false);

      } catch (Exception e) {
          log.error("receive message has an error, ", e);
          channel.basicNack(tag, false, true);
      }
  }

}

說明一下依賴的spring-rabbit包的版本,我的是2.2.0.RELEASE。如果是2.1.4版本里,@RabbitListener注解沒有ackMode。

解決本案問題過程中的花絮:

springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi)的方法

spring-rabbit-2.1.4.RELEASEspring-rabbit-2.2.0.RELEASE

springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi)的方法

@RabbitListener的ackMode的值見枚舉org.springframework.amqp.core.AcknowledgeMode

NONE-- no acks(自動消費(fèi) autoAck)MANUAL --Manual acks - user must ack/nack via a channel aware listener.(手動消費(fèi),Consumer端必須顯式調(diào)用ack或nack)AUTO --

springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi)的方法

設(shè)置了手動消費(fèi),上文消費(fèi)端的deliveryTag會是不同的long值。自動消費(fèi)的deliveryTag是重復(fù)的1和2這樣的。并且,自動消費(fèi)時,如果要使用channel的ack/nack,會報(bào)異常:

2020-06-19 22:26:54.586 [AMQP Connection 192.168.40.20:5672] ERROR o.s.a.rabbit.connection.CachingConnectionFactory:1468 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-06-19 22:26:54.599 [SimpleAsyncTaskExecutor-1] ERROR c.e.z.r.p.modules.mq.UserSettlementAckConsumer:49 -
org.springframework.amqp.AmqpException: PublisherCallbackChannel is closed
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1092)

到此這篇關(guān)于springboot+rabbitmq實(shí)現(xiàn)指定消費(fèi)者才能消費(fèi)的文章就介紹到這了,更多相關(guān)springboot rabbitmq消費(fèi)內(nèi)容請搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!

原文鏈接:https://www.cnblogs.com/buguge/p/13183980.html

延伸 · 閱讀

精彩推薦
1590
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 久久免费国产 | 午夜寂寞少妇aaa片毛片 | 欧美一区二区在线刺激视频 | 国产精品美女久久久久久久久久久 | 免费成人高清在线视频 | 亚洲精品久久久久久一区二区 | 欧美成年黄网站色视频 | 色在线视频观看 | 欧美国产一区二区三区 | 精品视频网 | 毛片一区 | 国产精品久久久久久久久 | 日韩有码在线观看 | 亚洲网站在线 | 免费观看特级毛片 | 久久精品国产清自在天天线 | 精品玖玖玖 | 久草中文在线 | 伊人精品视频 | 成人日韩 | 久久久久久久久一区二区三区 | 国产一级片| 久草成人 | 国产在线不卡 | 一本一本久久a久久精品综合妖精 | 一区中文字幕 | 免费成人在线视频网站 | 99re国产 | 日韩在线观看 | 99视频精品 | 亚洲乱码国产乱码精品精的特点 | 91在线观看高清视频 | 日本一二三视频 | 亚洲成人av一区二区三区 | 欧美成人免费在线视频 | 韩国精品 | 精产品自偷自拍 | 欧美精品在欧美一区二区少妇 | 久久伊人一区 | 99国产精品99久久久久久 | 精品久久久久久亚洲综合网 |