国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 詳解spring boot集成RabbitMQ

詳解spring boot集成RabbitMQ

2020-09-03 14:08SamHxm Java教程

RabbitMQ作為AMQP的代表性產品,在項目中大量使用。結合現在主流的spring boot,極大簡化了開發過程中所涉及到的消息通信問題。

RabbitMQ作為AMQP的代表性產品,在項目中大量使用。結合現在主流的spring boot,極大簡化了開發過程中所涉及到的消息通信問題。

首先正確的安裝RabbitMQ及運行正常。

RabbitMQ需啊erlang環境,所以首先安裝對應版本的erlang,可在RabbitMQ官網下載

?
1
# rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm

使用yum安裝RabbitMQ,避免缺少依賴包引起的安裝失敗

?
1
# yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm

啟動RabbitMQ

?
1
# /sbin/service rabbitmq-server start

由于RabbitMQ默認提供的guest用戶只能本地訪問,所以額外創建用戶用于測試

?
1
2
# /sbin/rabbitmqctl add_user test test123
用戶名:test,密碼:test123

開啟web管理插件

?
1
# rabbitmq-plugins enable rabbitmq_management

并使用之前創建的用戶登錄,并設置該用戶為administrator,虛擬主機地址為/

spring boot 引入相關依賴

?
1
2
3
4
5
6
<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
</dependencies>

消息生產者

application.properties添加一下配置

?
1
2
3
4
5
6
7
spring.rabbitmq.host=192.168.1.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test123
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

spring boot配置類,作用為指定隊列,交換器類型及綁定操作

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitConfig {
 
  //聲明隊列
  @Bean
  public Queue queue1() {
    return new Queue("hello.queue1", true); // true表示持久化該隊列
  }
 
  @Bean
  public Queue queue2() {
    return new Queue("hello.queue2", true);
  }
 
  //聲明交互器
  @Bean
  TopicExchange topicExchange() {
    return new TopicExchange("topicExchange");
  }
 
  //綁定
  @Bean
  public Binding binding1() {
    return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
  }
 
  @Bean
  public Binding binding2() {
    return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
  }
 
}

共聲明了2個隊列,分別是hello.queue1,hello.queue2,交換器類型為TopicExchange,并與hello.queue1,hello.queue2隊列分別綁定。

生產者類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.UUID;
 
import javax.annotation.PostConstruct;
 
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
@Component
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {
 
  @Autowired
  private RabbitTemplate rabbitTemplate;
 
  @PostConstruct
  public void init() {
    rabbitTemplate.setConfirmCallback(this);
    rabbitTemplate.setReturnCallback(this);
  }
 
  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (ack) {
      System.out.println("消息發送成功:" + correlationData);
    } else {
      System.out.println("消息發送失敗:" + cause);
    }
 
  }
 
  @Override
  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    System.out.println(message.getMessageProperties().getCorrelationIdString() + " 發送失敗");
 
  }
 
  //發送消息,不需要實現任何接口,供外部調用。
  public void send(String msg){
 
    CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
 
    System.out.println("開始發送消息 : " + msg.toLowerCase());
    String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString();
    System.out.println("結束發送消息 : " + msg.toLowerCase());
    System.out.println("消費者響應 : " + response + " 消息處理完成");
  }
}

要點:

1.注入RabbitTemplate

2.實現RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback接口(非必須)。
ConfirmCallback接口用于實現消息發送到RabbitMQ交換器后接收ack回調。ReturnCallback接口用于實現消息發送到RabbitMQ交換器,但無相應隊列與交換器綁定時的回調。

3.實現消息發送方法。調用rabbitTemplate相應的方法即可。rabbitTemplate常用發送方法有

?
1
2
3
rabbitTemplate.send(message);  //發消息,參數類型為org.springframework.amqp.core.Message
rabbitTemplate.convertAndSend(object); //轉換并發送消息。 將參數對象轉換為org.springframework.amqp.core.Message后發送
rabbitTemplate.convertSendAndReceive(message) //轉換并發送消息,且等待消息者返回響應消息。

針對業務場景選擇合適的消息發送方式即可。

消息消費者

application.properties添加一下配置

?
1
2
3
4
5
6
7
spring.rabbitmq.host=192.168.1.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test123
 
spring.rabbitmq.listener.concurrency=2  //最小消息監聽線程數
spring.rabbitmq.listener.max-concurrency=2 //最大消息監聽線程數

消費者類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class Receiver {
 
  @RabbitListener(queues = "hello.queue1")
  public String processMessage1(String msg) {
    System.out.println(Thread.currentThread().getName() + " 接收到來自hello.queue1隊列的消息:" + msg);
    return msg.toUpperCase();
  }
 
  @RabbitListener(queues = "hello.queue2")
  public void processMessage2(String msg) {
    System.out.println(Thread.currentThread().getName() + " 接收到來自hello.queue2隊列的消息:" + msg);
  }
}

由于定義了2個隊列,所以分別定義不同的監聽器監聽不同的隊列。由于最小消息監聽線程數和最大消息監聽線程數都是2,所以每個監聽器各有2個線程實現監聽功能。

要點:

1.監聽器參數類型與消息實際類型匹配。在生產者中發送的消息實際類型是String,所以這里監聽器參數類型也是String。

2.如果監聽器需要有響應返回給生產者,直接在監聽方法中return即可。

運行測試

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.Date;
 
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
import com.sam.demo.rabbitmq.Application;
import com.sam.demo.rabbitmq.sender.Sender;
 
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitTests {
 
  @Autowired
  private Sender sender;
 
  @Test
  public void sendTest() throws Exception {
    while(true){
      String msg = new Date().toString();
      sender.send(msg);
      Thread.sleep(1000);
    }
  }
}

輸出:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
開始發送消息 : wed mar 29 23:20:52 cst 2017
SimpleAsyncTaskExecutor-1 接收到來自hello.queue2隊列的消息:Wed Mar 29 23:20:52 CST 2017
SimpleAsyncTaskExecutor-2 接收到來自hello.queue1隊列的消息:Wed Mar 29 23:20:52 CST 2017
結束發送消息 : wed mar 29 23:20:52 cst 2017
消費者響應 : WED MAR 29 23:20:52 CST 2017 消息處理完成
------------------------------------------------
消息發送成功:CorrelationData [id=340d14e6-cfcc-4653-9f95-29b37d50f886]
開始發送消息 : wed mar 29 23:20:53 cst 2017
SimpleAsyncTaskExecutor-1 接收到來自hello.queue1隊列的消息:Wed Mar 29 23:20:53 CST 2017
SimpleAsyncTaskExecutor-2 接收到來自hello.queue2隊列的消息:Wed Mar 29 23:20:53 CST 2017
結束發送消息 : wed mar 29 23:20:53 cst 2017
消費者響應 : WED MAR 29 23:20:53 CST 2017 消息處理完成
------------------------------------------------
消息發送成功:CorrelationData [id=e4e01f89-d0d4-405e-80f0-85bb20238f34]
開始發送消息 : wed mar 29 23:20:54 cst 2017
SimpleAsyncTaskExecutor-2 接收到來自hello.queue1隊列的消息:Wed Mar 29 23:20:54 CST 2017
SimpleAsyncTaskExecutor-1 接收到來自hello.queue2隊列的消息:Wed Mar 29 23:20:54 CST 2017
結束發送消息 : wed mar 29 23:20:54 cst 2017
消費者響應 : WED MAR 29 23:20:54 CST 2017 消息處理完成
------------------------------------------------

如果需要使用的其他的交換器類型,spring中都已提供實現,所有的交換器均實現org.springframework.amqp.core.AbstractExchange接口。

常用交換器類型如下:

Direct(DirectExchange):direct 類型的行為是"先匹配, 再投送". 即在綁定時設定一個 routing_key, 消息的routing_key完全匹配時, 才會被交換器投送到綁定的隊列中去。

Topic(TopicExchange):按規則轉發消息(最靈活)。

Headers(HeadersExchange):設置header attribute參數類型的交換機。

Fanout(FanoutExchange):轉發消息到所有綁定隊列。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://www.jianshu.com/p/e1258c004314#

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 中文字幕一区二区三区日韩精品 | 久久成人高清 | 国产精品久久久久久久久免费桃花 | 欧美视频一二三区 | 亚洲国产精品99久久久久久久久 | 蜜臀久久99精品久久久无需会员 | 中文字幕在线观看日韩 | 亚洲免费久久久 | 久久久国产视频 | 五月天色婷婷视频 | 久草视频网站 | 欧美高清视频在线观看 | 天天综合久久 | 国产精品一卡二卡 | 自拍偷拍小视频 | 久久av一区 | 国产日韩精品一区 | 91中文在线 | 99久久免费视频在线观看 | 久久久毛片 | 亚洲免费观看 | 欧美一区二区高清视频 | 免费观看aaa | 天天摸天天操 | 美女在线视频一区二区 | 激情久久婷婷 | 成人黄色电影小说 | 在线播放国产一区二区三区 | 精品国产髙清在线看国产毛片 | 亚洲国产成人av | 日韩精品一区在线 | 久久精品无码一区二区三区 | 亚洲欧美另类久久久精品2019 | 91精品福利少妇午夜100集 | 午夜小视频在线 | 久久久久网站 | 91九色视频pron | 久久久美女| 国产精品一二三区 | 91精品国产一区二区三区 | 日本中文字幕一区二区 |