一、簡介
在微服務架構的系統中,我們通常會使用輕量級的消息代理來構建一個共用的消息主題讓系統中所有微服務實例都連接上來,由于該主題中產生的消息會被所有實例監聽和消費,所以我們稱它為消息總線。
二、消息代理
消息代理(message broker)是一種消息驗證、傳輸、路由的架構模式。它在應用程序之間起到通信調度并最小化應用之間的依賴的作用,使得應用程序可以高效地解耦通信過程。消息代理是一個中間件產品,它的核心是一個消息的路由程序,用來實現接收和分發消息, 并根據設定好的消息處理流來轉發給正確的應用。 它包括獨立的通信和消息傳遞協議,能夠實現組織內部和組織間的網絡通信。設計代理的目的就是為了能夠從應用程序中傳入消息,并執行一些特別的操作,下面這些是在企業應用中,我們經常需要使用消息代理的場景:
- 將消息路由到一個或多個目的地。
- 消息轉化為其他的表現方式。
- 執行消息的聚集、消息的分解,并將結果發送到它們的目的地,然后重新組合響應返回給消息用戶。
- 調用web服務來檢索數據。
- 響應事件或錯誤。
- 使用發布-訂閱模式來提供內容或基千主題的消息路由。
目前已經有非常多的開源產品可以供大家使用, 比如:
- activemqkafka
- rabbitmq
- rocketmq
- 等......
三、springcloud+rabbitmq
(1)rabbitmq簡介、安裝不贅述。
(2)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
|
<dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> |
(3)application.yml
1
2
3
4
5
6
7
8
|
spring: application: name: rabbitmq-hello rabbitmq: host: ***.***.***.*** port: 5672 username: guest password: guest |
(4)發送者sender
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@component public class sender { private static final logger log = loggerfactory.getlogger(sender. class ); @autowired private amqptemplate amqptemplate; public void send() { string context = "hello " + new date(); log.info( "sender : " + context); this .amqptemplate.convertandsend( "hello" , context); } } |
(5)接受者receiver
1
2
3
4
5
6
7
8
9
10
11
|
@component @rabbitlistener (queues = "hello" ) public class receiver { private static final logger log = loggerfactory.getlogger(receiver. class ); @rabbithandler public void process(string hello) { log.info( "receiver : " + hello); } } |
(6)創建rabbitmq的配置類 rabbitconfig
1
2
3
4
5
6
7
8
|
@configuration public class rabbitconfig { @bean public queue helloqueue(){ return new queue( "hello" ); } } |
(7)創建單元測試類, 用來調用消息生產
1
2
3
4
5
6
7
8
9
10
11
12
|
@runwith (springjunit4classrunner. class ) @springboottest (classes = springcloudbusrabbitmqapplication. class ) public class helloapplicationtests { @autowired private sender sender; @test public void hello() throws exception { sender.send(); } } |
(8)測試,執行helloapplicationtests
(9)訪問host:15672
四、改造config-client(整合springcloud bus)
(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
<dependencies> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-config</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-eureka</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-bus-amqp</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-actuator</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> |
(2)bootstrap.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
spring.application.name=configspace spring.cloud.config.label=master spring.cloud.config.profile=dev spring.cloud.config.uri= http: //localhost:5588/ eureka.client.serviceurl.defaultzone=http: //localhost:5555/eureka/ server.port= 5589 spring.rabbitmq.host= 118.89 . 237.88 spring.rabbitmq.port= 5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest management.security.enabled= false |
(3)其他不用改變
五、測試
(1)測試準備
一個服務注冊中心,eurekaserver,端口為5555;
一個分布式配置中心,configserver,端口為5588;
二個分布式配置,configclient,端口為5589、5590;(2)訪問http://localhost:5589/from
(3)訪問http://localhost:5590/from
rabbitmq:
(4)去倉庫修改password的值
1
2
3
|
from=git-dev-v1. 0 by springcloud config-server username=springcloud password= 1234567890 |
(5)post請求http://localhost:5589/bus/refresh或者http://localhost:5590/bus/refresh
成功請求后config-client會重新讀取配置文件
(6)再次訪問
- 如果post請求的是:http://localhost:5589/bus/refresh,請訪問http://localhost:5590/from
- 如果訪問出現401,則配置需要加上management.security.enabled=false
如果post請求的是:http://localhost:5590/bus/refresh,請訪問http://localhost:5589/from
另/bus/refresh接口可以指定服務,即使用“username”參數,比如 “/bus/refresh?destination=username:**”即刷新服務名為username的所有服務,不管ip地址。
(7)架構
(8)架構調整
既然springcloud bus的/bus/refresh接口提供了針對服務和實例進行配置更新的參數,那么我們的架構也可以相應做出一些調整。在之前的架構中,服務的配置更新需要通過向具體服務中的某個實例發送請求,再觸發對整個服務集群的配置更新。雖然能實現功能,但是這樣的結果是,我們指定的應用實例會不同千集群中的其他應用實例,這樣會增加集群內部的復雜度,不利于將來的運維工作。比如, 需要對服務實例進行遷移,那么我們不得不修改web hook中的配置等。所以要盡可能地讓服務集群中的各個節點是對等的。
因此, 我們將之前的架構做了 一些調整, 如下圖所示:
主要做了以下這些改動:
- 在configserver中也引入springcloud bus,將配置服務端也加入到消息總線中來。
- /bus/refresh請求不再發送到具體服務實例上,而是發送給config server,并通過des巨nation參數來指定需要更新配置的服務或實例。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://blog.csdn.net/smartdt/article/details/79073765