1. 生產(chǎn)者模塊通過(guò)publisher confirm機(jī)制實(shí)現(xiàn)消息可靠性
1.1 生產(chǎn)者模塊導(dǎo)入rabbitmq相關(guān)依賴
1
2
3
4
5
6
7
8
9
10
|
<!--AMQP依賴,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--用于mq消息的序列化與反序列化--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> |
1.2 配置文件中進(jìn)行mq的相關(guān)配置
1
2
3
4
5
6
7
8
9
|
spring.rabbitmq.host= 10.128 . 240.183 spring.rabbitmq.port= 5672 spring.rabbitmq.virtual-host=/ spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns= true spring.rabbitmq.template.mandatory= true |
- publish-confirm-type:開啟publisher-confirm,有以下可選值
simple:同步等待confirm結(jié)果,直到超時(shí)
correlated:異步回調(diào),定義ConfirmCallback。mq返回結(jié)果時(shí)會(huì)回調(diào)這個(gè)ConfirmCallback
- publish-returns:開啟publish-return功能??梢远xReturnCallback
- template.mandatory: 定義消息路由失敗的策略
true:調(diào)用ReturnCallback
false:直接丟棄消息
1.3 定義ReturnCallback(消息投遞到隊(duì)列失敗觸發(fā)此回調(diào))
- 每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback。
- 當(dāng)消息投遞失敗,就會(huì)調(diào)用生產(chǎn)者的returnCallback中定義的處理邏輯
- 可以在容器啟動(dòng)時(shí)就配置這個(gè)回調(diào)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 獲取RabbitTemplate對(duì)象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate. class ); // 配置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 判斷是否是延遲消息 Integer receivedDelay = message.getMessageProperties().getReceivedDelay(); if (receivedDelay != null && receivedDelay > 0 ) { // 是一個(gè)延遲消息,忽略這個(gè)錯(cuò)誤提示 return ; } // 記錄日志 log.error( "消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機(jī): {}, 路由key:{}, 消息: {}" , replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的話,重發(fā)消息 }); } } |
1.4 定義ConfirmCallback(消息到達(dá)交換機(jī)觸發(fā)此回調(diào))
可以為redisTemplate指定一個(gè)統(tǒng)一的確認(rèn)回調(diào)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 獲取RabbitTemplate對(duì)象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate. class ); // 配置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 判斷是否是延遲消息 Integer receivedDelay = message.getMessageProperties().getReceivedDelay(); if (receivedDelay != null && receivedDelay > 0 ) { // 是一個(gè)延遲消息,忽略這個(gè)錯(cuò)誤提示 return ; } // 記錄日志 log.error( "消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{}, 失敗原因:{}, 交換機(jī): {}, 路由key:{}, 消息: {}" , replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的話,重發(fā)消息 }); // 設(shè)置統(tǒng)一的confirm回調(diào)。只要消息到達(dá)broker就ack=true rabbitTemplate.setConfirmCallback( new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println( "這是統(tǒng)一的回調(diào)" ); System.out.println( "correlationData:" + correlationData); System.out.println( "ack:" + b); System.out.println( "cause:" + s); } }); } } |
也可以為特定的消息定制回調(diào)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testmq() throws InterruptedException { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback(result->{ if (result.isAck()) { // ACK log.debug( "消息成功投遞到交換機(jī)!消息ID: {}" , correlationData.getId()); } else { // NACK log.error( "消息投遞到交換機(jī)失??!消息ID:{}" , correlationData.getId()); // 重發(fā)消息 } },ex->{ // 記錄日志 log.error( "消息發(fā)送失??!" , ex); // 重發(fā)消息 }); rabbitTemplate.convertAndSend( "example.direct" , "blue" , "hello,world" ,correlationData); } |
2. 消費(fèi)者模塊開啟消息確認(rèn)
2.1 添加配置
1
2
|
# 手動(dòng)ack消息,不使用默認(rèn)的消費(fèi)端確認(rèn) spring.rabbitmq.listener.simple.acknowledge-mode=manual |
- none:關(guān)閉ack,消息投遞時(shí)不可靠的,可能丟失
- auto:類似事務(wù)機(jī)制,出現(xiàn)異常時(shí)返回nack,消息回滾到mq,沒有異常,返回
- ackmanual:我們自己指定什么時(shí)候返回ack
2.2 manual模式在監(jiān)聽器中自定義返回ack
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@RabbitListener (queues = "order.release.order.queue" ) @Service public class OrderCloseListener { @Autowired private OrderService orderService; @RabbitHandler private void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println( "收到過(guò)期的訂單信息,準(zhǔn)備關(guān)閉訂單" + orderEntity.getOrderSn()); try { orderService.closeOrder(orderEntity); // 第二個(gè)參數(shù)為false則表示僅確認(rèn)此條消息。如果為true則表示對(duì)收到的多條消息同時(shí)確認(rèn) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } catch (Exception e) { // 第二個(gè)參數(shù)為ture表示將這個(gè)消息重新加入隊(duì)列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true ); } } } |
3. 消費(fèi)者模塊開啟消息失敗重試機(jī)制
3.1 配置文件添加配置,開啟本地重試
1
2
3
4
5
6
7
8
9
10
|
spring: rabbitmq: listener: simple: retry: enabled: true # 開啟消費(fèi)者失敗重試 initial-interval: 1000 # 初識(shí)的失敗等待時(shí)長(zhǎng)為 1 秒 multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-interval max-attempts: 3 # 最大重試次數(shù) stateless: true # true 無(wú)狀態(tài); false 有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為 false |
- 開啟本地重試,如果消息處理過(guò)程總拋出異常,不會(huì)requeue到隊(duì)列,而是在消費(fèi)者本地重試
- 重試達(dá)到最大次數(shù)后,spring會(huì)返回ack,消息會(huì)被丟棄
4. 消費(fèi)者模塊添加失敗策略(用于開啟失敗本地重試功能后)
- 當(dāng)開啟本地重試后,重試最大次數(shù)后消息直接丟棄。
- 三種策略,都繼承于MessageRecovery接口
- RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認(rèn)就是這種方式
- ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊(duì)
- RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機(jī)
4.2 定義處理失敗消息的交換機(jī)和隊(duì)列 沒有會(huì)自動(dòng)創(chuàng)建相應(yīng)的隊(duì)列、交換機(jī)與綁定關(guān)系,有了就啥也不做
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange( "error.direct" ); } @Bean public Queue errorQueue(){ return new Queue( "error.queue" , true ); } // 路由鍵為key @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with( "error" ); } |
4.3 向容器中添加一個(gè)失敗策略組件
1
2
3
4
5
|
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ // error為路由鍵 return new RepublishMessageRecoverer(rabbitTemplate, "error.direct" , "error" ); } |
到此這篇關(guān)于springboot中rabbitmq實(shí)現(xiàn)消息可靠性的文章就介紹到這了,更多相關(guān)springboot rabbitmq消息可靠性內(nèi)容請(qǐng)搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!
原文鏈接:https://blog.csdn.net/weixin_44390164/article/details/120455793