消息服務概述
為什么要使用消息服務
在多數應用尤其是分布式系統中,消息服務是不可或缺的重要部分,它使用起來比較簡單,同時解決了不少難題,例如異步處理、應用解耦、流量削峰、分布式事務管理等,使用消息服務可以實現一個高性能、高可用、高拓展的系統。下面我們使用實際開發中的若干場景來分析和說明為什么要使用消息服務,以及使用消息服務的好處。
異步處理
場景說明:用戶注冊后,系統需要將信息寫入數據庫,并發送注冊郵件和注冊短信通知
在圖8-1中,針對上述注冊業務的場景需求,處理方法有3種。
1)串行處理方式:用戶發送注冊請求后,服務器會先將注冊信息寫入數據庫,依次發送注冊郵件和短信消息,服務器只有在消息處理完畢后才會將處理結果返回客戶端。這種串行處理消息的方式非常耗時,用戶體驗不友好。
2)并行處理方式:用戶發送注冊請求后,將注冊信息寫入數據庫,同時發送注冊郵件和短信,最后返回給客戶端,這種并行處理的方式在一定程度上提高了后臺業務處理的效率,但如果遇到較為耗時的業務處理,仍然顯得不夠完善。
3)消息服務處理方式:可以在業務中嵌入消息服務進行業務處理,這種方式先將注冊信息寫入數據庫,在極短的時間內將注冊信息寫入消息隊列后即可返回響應信息。此時前端業務不需要理會不相干的后臺業務處理,而發送注冊郵件和短息的業務會自動讀取消息隊列中的相關信息進行后續業務處理。
應用解耦
場景說明:用戶下單后,訂單服務需要通知庫存服務。
如果使用傳統方式處理訂單業務,用戶下單后,訂單服務會直接調用庫存服務接口進行庫存更新,這種方式有一個很大的問題是:一旦庫存系統出現異常,訂單服務會失敗導致訂單丟失。如果使用消息服務模式,訂單服務的下訂單消息會快速寫入消息隊列,庫存服務會監聽并讀取到訂單,從而修改庫存。相較于傳統方式,消息服務模式顯得更加高效、可靠。
流量削峰
場景說明:秒殺活動是流量削峰的一種應用場景,由于服務器處理資源能力有限,因此出現峰值時很容易造成服務器宕機、用戶無法訪問的情況。為了解決這個問題,通常會采用消息隊列緩沖瞬時高峰流量,對請求進行分層過濾,從而過濾掉一些請求。
針對上述秒殺業務的場景需求,如果專門增設服務器來應對秒殺活動期間的請求瞬時高峰的話,在非秒殺活動期間,這些多余的服務器和配置顯得有些浪費;如果不進行有效處理的話,秒殺活動瞬時高峰流量請求有可能壓垮服務,因此,在秒殺活動中加入消息服務是較為理想的解決方案。通過在應用前端加入消息服務,先將所有請求寫入到消息隊列,并限定一定的閾值,多余的請求直接返回秒殺失敗,秒殺服務會根據秒殺規則從消息隊列中讀取并處理有限的秒殺請求。
分布式事務管理
場景說明:在分布式系統中,分布式事務是開發中必須要面對的技術難題,怎樣保證分布式系統的請求業務處理的數據一致性通常是要重點考慮的問題。針對這種分布式事務管理的情況,目前較為可靠的處理方式是基于消息隊列的二次提交,在失敗的情況可以進行多次嘗試,或者基于隊列數據進行回滾操作。因此,在分布式系統中加入消息服務是一個既能保證性能不變,又能保證業務一致性的方案。
針對上述分布式事務管理的場景需求,如果使用傳統方式在訂單系統中寫入訂單支付成功信息后,再遠程調用庫存系統進行庫存更新,一旦庫存系統異常,很有可能導致庫存更新失敗而訂單支付成功的情況,從而導致數據不一致。針對這種分布式系統的事務管理,通常會在分布式系統之間加入消息服務進行管理。訂單支付成功后,寫入消息表;然后定時掃描消息表消息寫入到消息隊列中,庫存系統會立即讀取消息隊列中的消息進行庫存更新,同時添加消息處理狀態;接著,庫存系統向消息隊列中寫入庫存處理結果,訂單系統會立即讀取消息隊列中的庫存處理狀態。接著,庫存系統向消息隊列中寫入庫存處理結果,訂單系統會立即讀取消息隊列中的庫存處理狀態。如果庫存服務處理失敗,訂單服務還會重復掃描并發送消息表中的消息,讓庫存系統進行最終一致性的庫存更新。如果處理成功,訂單服務直接刪除消息表數據,并寫入到歷史消息表。
常用消息中間件介紹
消息隊列中間件(簡稱消息中間件)是指利用高效可靠的消息傳遞機制進行與平臺無關的數據交流,并基于數據通信來進行分布式系統的集成。目前開源的消息中間件有很多。
ActiveMQ
ActiveMQ是Apache公司出品的,采用Java語言編寫的、完全基于JMS規范(Java Message Service)的、面向消息的中間件,它為應用程序提供高效、可拓展的、穩定的、安全的企業級消息通信。ActiveMQ豐富的API和多種集群構建模式使得它成為業界老牌的消息中間件,廣泛應用于中小型企業中。相較于后續出現的RabbitMQ、RocketMQ、Kafka等消息中間件來說,ActiveMQ性能相對較弱,在如今的高并發、大數據處理的場景下顯得力不從心,經常會出現一些問題,例如消息延遲、堆積、堵塞等。
RabbitMQ
RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基于AMQP協議(Advanced Message Queuing Protocol)實現。AMQP是為應對大規模并發活動而提供統一消息服務的應用層標準高級消息隊列協議,專門為面向消息的中間件設計,該協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。正是基于AMQP協議的各種優勢性能,使得RabbitMQ消息中間件在應用開發中越來越受歡迎。
Kafka
Kafka是由Apache軟件基金會開發的一個開源流處理平臺,它是一種高吞吐量的分布式發布訂閱消息系統,采用Scala和Java語言編寫,提供了快速、可拓展的、分布式的、分區的和可復制的日志訂閱服務,其主要特定是追求高吞吐量,適用于產生大量數據的互聯網服務的數據收集業務。
RocketMQ
RocketMQ是阿里巴巴公司開源產品,目前也是Apache公司的頂級項目,使用純Java開發,具有高吞吐量、高可用、適合大規模分布式系統應用的特點。RocketMQ的思路起源于Kafka,對消息的可靠傳輸以及事務性做了優化,目前在阿里巴巴中被廣泛應用于交易、充值、流計算、消息推送、日志流式處理場景,不過維護上稍微麻煩。
在實際項目技術選型時,在沒有特別要求的場景下,通常會選擇使用RabbitMQ作為消息中間件,如果針對的是大數據業務,推薦使用Kafka或者是RocketMQ作為消息中間件。
RabbitMQ消息中間件
RabbitMQ簡介
RabbitMQ是基于AMQP協議的輕量級、可靠、可伸縮和可移植的消息代理,Spring使用RabbitMQ通過AMQP協議進行通信;在Spring Boot中對RabbitMQ進行了集成管理。
在所有的消息服務中,消息中間件都會作為一個第三方消息代理,接收發布者發布的消息,并推送給消息消費者。不同消息中間件內部轉換消息的細節不同。
RabbitMQ的消息代理流程中有很多細節內容和內部組件,這里不必會組件的具體作用,先對整個流程梳理一遍。
- 消息發布者(Publisher,簡稱P)向RabbitMQ代理(Broker)指定的虛擬主機服務器(Virtual Host)發送消息。
- 虛擬主機服務器內部的交換器(Exchange,簡稱X)接收消息,并將消息傳遞并存儲到與之綁定(Binding)的消息隊列(Queue)中。
- 消息消費者(Consumer,簡稱C)通過一定的網絡連接(Connection)與消息代理建立連接,同時為了簡化開支,在連接內部使用了多路復用的信道進行消息的最終消費。
RabbitMQ工作模式介紹
RabbitMQ消息中間件針對不同的服務需求,提供了多種工作模式。
Work queues(工作隊列模式)
在Work queues工作模式中,不需要設置交換器(RabbitMQ會使用內部默認交換器進行消息轉換),需要指定唯一的消息隊列進行消息傳遞,并且開源由多個消息消費者。在這種模式下,多個消息消費者通過輪詢的方式依次接收消息隊列中存儲的消息,一旦消息被某個消費者接收,消息隊列會將消息移除,而接收并處理消息的消費者必須在消費完一條消息后再準備接收下一條消息。
從上面的分析可以發現,Work queues工作模式適用于那些較為繁重,并且可以進行拆分處理的業務,這種情況下可以分派給多個消費者輪流處理業務。
Public/Subscribe(發布訂閱模式)
在Public/Subscribe工作模式中,必須先配置一個fanout類型的交換器,不需要指定對應的路由鍵(Routing key),同時會將消息路由到每一個消息隊列上,然后每個消息隊列都可以對相同的消息進行接收存儲,進而由各自消息隊列關聯的消費者進行消費。
從上面的分析可以發現,Publish/Subscribe工作模式適用于進行相同業務功能處理的場合。例如,用戶注冊成功后,需要同時發送郵件通知和短信通知,那么郵件服務消費者和短信服務消費者需要共同消費"用戶注冊成功"這一條消息。
Routing(路由模式)
在Routing工作模式中,必須先配置一個direct類型的交換器,并指定不同的路由鍵值(Routing key)將對應的消息從交換器路由到不同的消息隊列進行存儲,由消費者進行各自消費。
從上面的分析可以發現,Routing工作模式適用于進行不同類型消息分類處理的場合。例如,日志收集處理,用戶可以配置不同的路由鍵值分別對不同級別的日志信息進行分類處理。
Topics(通配符模式)
在Topics工作模式中,必須先配置一個topic類型的交換器,并指定不同的路由鍵值(Routing key)將對應的消息從交換器路由到不同的消息隊列進行存儲,然后由消費者進行各自消費。Topics模式與Routing模式的主要在于:Topics模式設置的路由鍵是包括通配符的,其中,#匹配多個字符,*匹配一個字符,然后與其他字符一起使用.進行連接,從而組成動態路由鍵,在發送消息時可以根據需求設置不同的路由鍵,從而將消息路由到不同的消息隊列。
通常情況下,Topics工作模式適用于根據不同需求動態傳遞處理業務的場合。例如一些訂閱客戶只接收郵件消息,一些訂閱客戶只接收短信消息,那么可以根據客戶需求進行動態路由匹配,從而將訂閱消息分發到不同的消息隊列中。
RPC
RPC工作模式與Work queues工作模式主體流程相似,都需要設置交換器,需要指定唯一的消息隊列進行消息傳遞。RPC模式與Work queues模式的主要不同在于:RPC模式是一個回環結構,主要針對分布式架構的消息傳遞。RPC模式與Work queues模式的主要不同在于:RPC模式是一個回環結構,主要針對分布式架構的消息傳遞業務,客戶端Cilent先發送消息到消息隊列,遠程服務端Server獲取消息,然后再寫入另一個消息隊列,向原始客戶端Client相應消息處理結果。
RPC工作模式適用于遠程服務調用的業務處理場合。例如,在分布式架構中必須考慮的分布式事務管理問題。
Headers
Headers工作模式在RabbitMQ所支持的工作模式中是較為少用的一種模式,其主體流程與Routing工作模式有些相似。不過,使用Headers工作模式時,必須設置一個headers類型的交換器,而不需要設置路由鍵,取而代之的是在Properties屬性配置中的headers頭信息中使用key/value的形式配置路由規則。由于Headers工作模式使用較少,官方文檔也滅有詳細說明。
上面的6中工作模式,有些可以嵌套使用,例如,在發布訂閱模式中加入工作隊列模式。其中Publish/Subscribe、Routing、Topics和RPC模式是開發中較為常用的工作模式。
RabbitMQ安裝以及整合環境搭建
安裝RabbitMQ
在使用RabbitMQ之前必須預先安裝配置,參考RabbitMQ官網說明,RabbitMQ支持多平臺安裝,例如Linux、Windows、MacOS、Docker等。這里,我們為了方便開發使用Windows環境為例,介紹RabbitMQ的安裝配置。
下載RabbitMQ
鏈接:https://pan.baidu.com/s/1REAC7btmaR7a-pLKfLGJqA
提取碼:1234
在安裝RabbitMQ之前需要Erlang語言包支持。
安裝RabbitMQ
RabbitMQ安裝包依賴于Erlang語言包的支持,所以需要先安裝Erlang語言包,在安裝RabbitMQ安裝包。RabbitMQ安裝包和Erlang語言包的安裝都非常簡單。(需要注意的是,安裝Erlang語言包,必須以管理員的身份進行安裝)。
在Windos環境下首先執行RabbitMQ的安裝,系統環境變量中會自動增加一個變量名為ERLANG_HOME的變量配置,它的配置路徑是Erlang選擇安裝的具體路徑,無須手動修改,同時,RabbitMQ服務也會自動啟動。如果是多次卸載安裝的RabbitMQ,需要保證ERLANG_HOME環境的配置正確,同時保證RabbitMQ服務正常啟動。
RabbitMQ可視化效果展示
查看開啟服務
rabbitmq-plugins.bat list
可以看到managerment服務沒有開啟
開啟可視化服務
rabbitmq-plugins enable rabbitmq_management
重啟rabbitmqctl
rabbitmqctl.bat start_app
RabbitMQ默認提供了兩個端口號5672和15672,其中5672作為服務端口號,15672用作可視化管理端口號。在瀏覽器上訪問http://localhost:15672通過可視化的方式查看RabbitMQ。
首次登錄RabbitMQ可視化管理頁面時需要進行用戶登錄,RabbitMQ安裝過程中默認提供了用戶名和密碼均為guest的用戶,可以使用該賬戶進行登錄。登陸成功后會進入RabbitMQ可視化管理頁面得首頁。
RabbitMQ可視化管理頁面中,顯示出了RabbitMQ的版本、用戶信息等信息,同時頁面還包括Connections、Channeis、Exchanges、Queues、Admin在內的管理面板。
Spring Boot整合RabbitMQ環境搭建
完成RabbitMQ的安裝后,下面我們開始對Spring Boot整合RabbitMQ實現消息服務需要的整合環境進行搭建。
1)創建Spring Boot項目。使用Spring Intializr方式創建一個名為chapter08的Spring Boot項目,在Dependercies依賴選擇中選擇Web模塊中Web依賴以及Integertion模塊中的RabbitMQ依賴。
2)編寫配置文件,連接RabbitMQ服務。打開創建項目時自動生成的application.properties全局配置文件,在該文件中編寫RabbitMQ服務對應的連接配置。
application.properties
#配置RabbitMQ消息中間件連接配置 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #配置RabbitMQ虛擬主機路徑/,默認可以省略 spring.rabbitmq.virtual-host=/
需要強調的是,在上述項目全局配置文件application.properties中,編寫了外部RabbitMQ消息中間件的連接配置,這樣在進行整合消息服務時,使用的都是我們自己安裝配置的RabbitMQ服務。而在Spring Boot中,也集成了一個內部默認的RabbitMQ中間件,如果我們沒有在配置文件中配置外部RabbtiMQ連接,會啟用內部的RabbitMQ中間件,這種內部RabbitMQ中間件是不推薦使用的。
Spring Boot與RabbitMQ整合實現
Publish/Subscribe(發布訂閱模式)
Spring Boot整合RabbitMQ中間件實現消息服務,注意圍繞3個部分的工作進行展開:定制中間件、消息發送者發送消息、消息消費者接收消息。其中。定制中間件是比較麻煩的工作,且必須預先定制。下面我們以用戶注冊成功后同時發送郵件通知和短信通知這一場景為例,分別使用基于API、基于配置類和基于注解這3種方式實現Publish/Subscribe工作模式的整合。
基于API的方式
基于API的方式注意講的是使用Spring框架提供的API管理類AmqpAdmin定制消息發送組件,并進行消息發送。這種定制消息發送組件的方式與RabbitMQ可視化界面上通過對應面板進行組件操作的實現基本一樣,都是通過管理員的身份,預先手動聲明交換器、隊列、路由鍵等,然后組裝消息隊列供應用程序調用,從而實現消息服務。
1)使用AmqpAdmin定制消息發送組件
打開chapter08項目的測試類Chapter08ApplicationTests,在該測試類中先引入AmqpAdmin管理類定制Publish/Subscribe工作模式所需的消息組件。
Chapter08ApplicationTests.java
package com.example.chapter08; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest class Chapter08ApplicationTests { @Autowired private AmqpAdmin amqpAdmin; @Test public void amqpAdmin() { //1.定義fanout類型的交換器 amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange")); //2.定義兩個默認持久化隊列,分別處理email和sms amqpAdmin.declareQueue(new Queue("fanout_queue_email")); amqpAdmin.declareQueue(new Queue("fanout_queue_sms")); //3.將隊列分別與交換器進行綁定 amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); } }
使用Spring框架提供的消息管理組件AmqpAdmin定制了消息組件。其中amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange"));定義了一個fanout類型的交換器fanout_exchange。amqpAdmin.declareQueue(new Queue("fanout_queue_email")); amqpAdmin.declareQueue(new Queue("fanout_queue_sms"));定義了兩個消息隊列fanout_queue_email和fanout_queue_sms,分別用來處理郵件信息和短信信息。
amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null));將定義的兩個隊列分別與交換器綁定。
執行上述單元測試方法amqpAdmin(),驗證RabbitMQ消息組件的定制效果。
執行成功后,通過RabbitMQ可視化管理頁面的Exchanges面板查看效果。
通過上述操作可以發現,在管理頁面中提供了消息組件交換器、隊列的定制功能。在程序中使用Spring框架提供的管理員API組件AmqpAdmin定制消息組件和管理頁面上手動定制消息組件的本質是一樣的。
2)消息發送者發送消息
完成消息組件的定制工作后,創建消息發送者發送消息到消息隊列中。發送消息時,借助一個實體類傳遞消息,需要預先創建一個實體類對象。
首先,在chapter08項目中創建名為com.example.chapter.domain的包,并在該包下創建一個實體類User。
package com.example.chapter08.domain; public class User { private Integer id; private String username; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } @Override public String toString() { return "User{" + "id=" + id + ", username='" + username + '\'' + '}'; } }
其次,在項目測試類Chapter08ApplicationTests中使用Spring框架提供的RabbitTemplate模板類是實現消息發送。
@Autowired private RabbitTemplate rabbitTemplate; @Test public void psubPublisher(){ User user=new User(); user.setId(1); user.setUsername("石頭"); rabbitTemplate.convertAndSend("fanout_exchange","",user); }
上述代碼中,先使用@Autowired注解引入了進行消息中間件管理的RabbitTemplate組件對象,然后使用該模板工具類的convertAndSend(String exchange, String routingKey, Object object)方法進行消息發布。其中,第一個參數表示發送消息的交換器,這個參數值要與之前定制的交換器名稱一致;第二個參數表示路由鍵,因為實現的是Public/Subscribe工作模式,所以不需要指定;第3個參數是發送的消息內容,接收Object類型。
然后,執行測試方法。
顯示消息發送過程中默認使用了SimpleMessageConverter轉換器進行消息轉換存儲,該轉換器只支持字符串或實體對象序列化后的消息。而測試類中發送的是User實體類對象消息,所以發生異常。
解決方法
執行JDK自帶的Serializable序列化接口定制其他類型的消息轉換器。
兩種方法都可行,但是相對于第二種實現方式而言,第一種方式實現后的可視化效果較差,轉換后的消息無法辨別,所以一般使用第二種方式。
在chapter08項目中創建名為com.example.chapter08.config的包,并在該包下創建一個RabbitMQ消息配置類RabbitMQConfig。
package com.example.chapter08.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
創建一個RabbitMQ消息配置類RabbitMQConfig,并在該配置類中通過@Bean注解自定義一個Jackson2JsonMessageConverter類型的消息轉換器組件,該組件的返回值必須為MessageConverter類型。
再次執行psubPublisher()方法,該方法執行成功后,查看可視化界面
3)消息消費者接收消息
在chapter08項目中創建名為com.example.chapter08.service的包,并在該包下創建一個針對RabbitMQ消息中間件進行消息接收和處理的業務類RabbitMQService。
package com.example.chapter08.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMQService { /* * Publish/Subscribe工作模式接收,處理郵件業務 * */ @RabbitListener(queues = "fanout_queue_email") public void psubConsumerEmail(Message message){ byte[] body=message.getBody(); String s=new String(body); System.out.println("郵件業務接收到消息:" + s); } /* * Publish/Subscribe工作模式接收,處理短信業務 * */ @RabbitListener(queues = "fanout_queue_sms") public void psubConsumerSms(Message message){ byte[] body=message.getBody(); String s=new String(body); System.out.println("短信業務接收到消息:" + s); } }
創建一個接受處理RabbitMQ消息的業務處理類RabbitMQService,在該類中使用Spring框架提供的@RabbitListener注解監聽隊列名稱為fanout_queue_email和fanout_queue_sms的消息,監聽這兩個隊列是前面指定發送并存儲消息的消息隊列。
需要說明的是,使用@RabbitListener注解監聽隊列消息后,一旦服務啟動且監聽到指定的隊列有消息存在(目前兩個隊列中各有一條相同的消息),對應注解的方法會立即接收并消費隊列中的消息。另外,在接受消息的方法中,參數類型可以與發送的消息類型保持一致,或者使用Object類型和Message類型。如果使用消息類型對應的參數接收消息的話,只能夠得到具體的消息體信息;如果使用Object或者Message類型參數接收消息的話,還可以獲得除了消息體外的參數信息MessageProperties。
案例中使用的是開發中常用的@RabbitLIsenter注解監聽指定名稱的消息情況,這種方式會在監聽到指定隊列存在消息后立即進行消費處理。除此之外,還可以使用RabbitTemplate模板類的receiveAndConvert(String queueName)方法手動消費指定隊列中的消息。
基于配置類的方式
基于配置類的方式主要講的是使用Spring Boot框架提供的@Configuration注解配置類定制消息發送組件,并進行消息發送。
RabbitMQConfig.java
package com.example.chapter08.config; import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { //自定義消息轉化器 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } //1.定義fanout類型的交換器 @Bean public Exchange fanout_exchange(){ return ExchangeBuilder.fanoutExchange("fanout_exchange").build(); } //2.定義兩個不同名稱的消息隊列 @Bean public Queue fanout_queue_email(){ return new Queue("fanout_queue_email"); } @Bean public Queue fanout_queue_sms(){ return new Queue("fanout_queue_sms"); } //3.將兩個不同名稱的消息隊列與交換器進行綁定 @Bean public Binding bindingEmail(){ return BindingBuilder.bind(fanout_queue_email()).to(fanout_exchange()).with("").noargs(); } @Bean public Binding bindingSms(){ return BindingBuilder.bind(fanout_queue_sms()).to(fanout_exchange()).with("").noargs(); } }
使用@Bean注解定制了3種類型的Bean組件,這3種組件分別表示交換器、消息隊列和消息隊列與綁定器的綁定。這種基于配置類方式定制的消息組件內容和基于API方式定制的消息組件內容完全一樣,只不過是實現方式不同而已。
按照消息服務整合實現步驟,完成消息組件的定制后,還需要編寫消息發送者和消息消費者,而在基于API的方式中已經實現類消息發送者和消息消費者,并基于配置類方式定制的消息組件名稱和之前測試用的消息發送和消息組件名稱都是一樣的,所以這里可以直接重復使用。
重新運行消息發送者測試方法psubPublisher(),消息消費者可以自動監聽并消費消息隊列種存在的消息,效果與基于API的方式測試效果一樣。
基于注解的方式
基于注解的方式指的是使用Spring框架的@RabbitListener注解定制消息發送組件并發送消息。
打開進行消息接收和處理的業務類RabbitMQService,將針對郵件業務和短線業務處理的消息消費者方式進行注解,使用@RabbitListener注解機器相關屬性定制消息發送組件。
RabbitMQService.java
package com.example.chapter08.service; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMQService { /* * Publish/Subscribe工作模式接收,處理郵件業務 * */ //@RabbitListener(queues = "fanout_queue_email") @RabbitListener(bindings = @QueueBinding(value= @Queue("fanout_queue_email") ,exchange = @Exchange(value = "fanout_exchange" ,type = "fanout"))) public void psubConsumerEmail(Message message){ byte[] body=message.getBody(); String s=new String(body); System.out.println("郵件業務接收到消息:" + s); } /* * Publish/Subscribe工作模式接收,處理短信業務 * */ //@RabbitListener(queues = "fanout_queue_sms") @RabbitListener(bindings = @QueueBinding(value= @Queue("fanout_queue_sms") ,exchange = @Exchange(value = "fanout_exchange" ,type = "fanout"))) public void psubConsumerSms(Message message){ byte[] body=message.getBody(); String s=new String(body); System.out.println("短信業務接收到消息:" + s); } }
至此,在Spring Boot中完成了使用基于API、基于配置類和基于注解3種方式來實現Publish/Subscribe工作模式的整合講解。在這3種實現消息服務的方式中,基于API的方式相對簡單、直觀,但容易與業務代碼產生耦合;基于配置類的方式相對隔離、容易統一管理、符合Spring Boot框架思想;基于注解的方式清晰明了、方便各自管理,但是也容易與業務代碼產生耦合。在實際開發中,使用基于配置類的方式和基于注解的方式定制組件實現消息服務較為常見,使用基于API的方式偶爾使用。
Routing(路由模式)
使用基于注解的方式定制消息組件和消費者
RabbitMQService
/* * 2.1路由模式消息接收、處理error級別日志信息 * */ @RabbitListener(bindings = @QueueBinding( value = @Queue("routing_queue_error"), exchange = @Exchange(value = "routing_exchange",type = "direct"), key = "error_routing_key" )) public void routingConsumerError(String message){ System.out.println("接收到error級別日志消息:" + message); } /* * 2.2路由模式消息接收、處理info、error、warning級別日志信息 * */ public void routingConsumerAll(String message){ System.out.println("接收到info、error、warning等級別日志消息:" + message); }
上述代碼中,在消息業務處理類RabbitMQService中新增了兩個用來處理Routing路由模式的消息消費者方法,在兩個消費者方式上使用@RabbitListener注解及其相關屬性定制了路由模式下的消息服務組件。從示例代碼可以看出,與發布訂閱模式下的注解相比,Routing路由模式下的交換器類型type屬性為direct,而且還必須指定key屬性(每個消息隊列可以映射多個路由鍵,而在Spring Boot 1.X版本中,@QueueBinding中的key屬性只接收Spring類型而不接收Spring[]類型)。
消息發送者發送消息
打開項目測試類Chapter08ApplicationTests,在該測試類中使用RabbitTemplate模板類實現Routing路由模式下的消息發送。
// 2.Routing工作模式消息發送端 @Test public void routingPublish(){ rabbitTemplate.convertAndSend("routing_exchange","error_routing_key","routing send error message"); }
在路由工作模式下發送消息時,必須指定路由鍵參數,該參數要與消息隊列映射的路由鍵保持一致,否則發送的消息將會丟失。本次示例中使用的是error_routing_key路由鍵,根據定制規則,編寫的兩個消息消費者方式應該都可以正常接收并消費該發送端的消息。
Topics(通配符模式)
使用基于注解的方式定制消息組件和消息消費者
/* * 3.1通配符模式消息接收、進行郵件業務訂閱處理 * */ @RabbitListener(bindings = @QueueBinding( value = @Queue("topic_queue_email"), exchange = @Exchange(value = "topic_exchange",type = "topic"), key = "info.#.email.#" )) public void topicConsumerEmail(String message){ System.out.println("接收到郵件訂閱需求處理消息:" + message); } /* * 3.2通配符消息接收、進行短信業務訂閱處理 * */ @RabbitListener(bindings = @QueueBinding( value = @Queue("topic_queue_sms"), exchange = @Exchange(value = "topic_exchange",type = "topic"), key = "info.#.sms.#" )) public void topicConsumerSms(String message){ System.out.println("接收到短信訂閱需求處理消息:" + message); }
@Test public void topicPublisher(){ rabbitTemplate.convertAndSend("topic_exchange","info.email","topics send email message"); }
到此這篇關于Java Spring Boot消息服務萬字詳解分析的文章就介紹到這了,更多相關Java Spring Boot消息服務內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://blog.csdn.net/shi_zi_183/article/details/120907864