国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務(wù)器之家:專(zhuān)注于服務(wù)器技術(shù)及軟件下載分享
分類(lèi)導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語(yǔ)言|JavaScript|易語(yǔ)言|vb.net|

服務(wù)器之家 - 編程語(yǔ)言 - Java教程 - springBoot整合RocketMQ及坑的示例代碼

springBoot整合RocketMQ及坑的示例代碼

2021-06-11 13:50龍俊潔 Java教程

這篇文章主要介紹了springBoot整合RocketMQ及坑的示例代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧

版本:

  • jdk:1.8
  • springboot:1.5.10
  • rocketmq:4.2.0

pom 配置:    

?
1
2
3
4
5
6
7
8
9
10
<parent>
 <groupid>org.springframework.boot</groupid>
 <artifactid>spring-boot-starter-parent</artifactid>
 <version>1.5.10.release</version>
</parent>
<dependency>
  <groupid>org.apache.rocketmq</groupid>
  <artifactid>rocketmq-client</artifactid>
  <version>4.2.0</version>
</dependency>

application.properties  配置:

?
1
2
3
4
5
6
# 消費(fèi)者的組名
apache.rocketmq.consumer.pushconsumer=pushconsumer
# 生產(chǎn)者的組名
apache.rocketmq.producer.producergroup=producer
# nameserver地址
apache.rocketmq.namesrvaddr=localhost:9876

java代碼:

生產(chǎn)者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package test.config.rocketmq;
 
import org.apache.rocketmq.client.producer.defaultmqproducer;
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.common.message.message;
import org.apache.rocketmq.remoting.common.remotinghelper;
import org.springframework.beans.factory.annotation.value;
import org.springframework.stereotype.component;
import org.springframework.util.stopwatch;
import javax.annotation.postconstruct;
 
@component
public class rocketmqclient {
  /**
   * 生產(chǎn)者的組名
   */
  @value("${apache.rocketmq.producer.producergroup}")
  private string producergroup;
 
  /**
   * nameserver 地址
   */
  @value("${apache.rocketmq.namesrvaddr}")
  private string namesrvaddr;
 
  @postconstruct
  public void defaultmqproducer() {
    //生產(chǎn)者的組名
    defaultmqproducer producer = new defaultmqproducer(producergroup);
    //指定nameserver地址,多個(gè)地址以 ; 隔開(kāi)
    producer.setnamesrvaddr(namesrvaddr);
    producer.setvipchannelenabled(false);
    try {
      /**
       * producer對(duì)象在使用之前必須要調(diào)用start初始化,初始化一次即可
       * 注意:切記不可以在每次發(fā)送消息時(shí),都調(diào)用start方法
       */
      producer.start();
 
      //創(chuàng)建一個(gè)消息實(shí)例,包含 topic、tag 和 消息體
      //如下:topic 為 "topictest",tag 為 "push"
      message message = new message("topictest", "push", "發(fā)送消息----zhisheng-----".getbytes(remotinghelper.default_charset));
 
      stopwatch stop = new stopwatch();
      stop.start();
 
      for (int i = 0; i < 1; i++) {
        sendresult result = producer.send(message);
        system.out.println("發(fā)送響應(yīng):msgid:" + result.getmsgid() + ",發(fā)送狀態(tài):" + result.getsendstatus());
      }
      stop.stop();
      system.out.println("----------------發(fā)送一萬(wàn)條消息耗時(shí):" + stop.gettotaltimemillis());
    } catch (exception e) {
      e.printstacktrace();
    } finally {
      producer.shutdown();
    }
  }
}

消費(fèi)者: 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import org.apache.rocketmq.client.consumer.defaultmqpushconsumer;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlystatus;
import org.apache.rocketmq.client.consumer.listener.messagelistenerconcurrently;
import org.apache.rocketmq.common.consumer.consumefromwhere;
import org.apache.rocketmq.common.message.messageext;
import org.apache.rocketmq.remoting.common.remotinghelper;
import org.springframework.beans.factory.annotation.value;
import org.springframework.stereotype.component;
 
import javax.annotation.postconstruct;
 
 
@component
public class rocketmqserver {
  /**
   * 消費(fèi)者的組名
   */
  @value("${apache.rocketmq.consumer.pushconsumer}")
  private string consumergroup;
 
  /**
   * nameserver 地址
   */
  @value("${apache.rocketmq.namesrvaddr}")
  private string namesrvaddr;
 
  @postconstruct
  public void defaultmqpushconsumer() {
    //消費(fèi)者的組名
    defaultmqpushconsumer consumer = new defaultmqpushconsumer(consumergroup);
 
    //指定nameserver地址,多個(gè)地址以 ; 隔開(kāi)
    consumer.setnamesrvaddr(namesrvaddr);
    consumer.setvipchannelenabled(false);
    try {
      //訂閱pushtopic下tag為push的消息
      consumer.subscribe("topictest", "push");
 
      //設(shè)置consumer第一次啟動(dòng)是從隊(duì)列頭部開(kāi)始消費(fèi)還是隊(duì)列尾部開(kāi)始消費(fèi)
      //如果非第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
      consumer.setconsumefromwhere(consumefromwhere.consume_from_first_offset);
      consumer.registermessagelistener((messagelistenerconcurrently) (list, context) -> {
        try {
          for (messageext messageext : list) {
 
            system.out.println("messageext: " + messageext);//輸出消息內(nèi)容
 
            string messagebody = new string(messageext.getbody(), remotinghelper.default_charset);
 
            system.out.println("消費(fèi)響應(yīng):msgid : " + messageext.getmsgid() + ", msgbody : " + messagebody);//輸出消息內(nèi)容
          }
        } catch (exception e) {
          e.printstacktrace();
          return consumeconcurrentlystatus.reconsume_later; //稍后再試
        }
        return consumeconcurrentlystatus.consume_success; //消費(fèi)成功
      });
      consumer.start();
    } catch (exception e) {
      e.printstacktrace();
    }
  }
}

掉坑總結(jié):

1.rocketmq啟動(dòng)時(shí),命令不是  mqbroker -n 127.0.0.1:9876

         正確應(yīng)該是:mqbroker -n 127.0.0.1:9876 butiautocreatetopicenable=true

         否則會(huì)拋出:no route info of this topic, topictest

2.客戶(hù)端連接時(shí)拋出異常

        org.apache.rocketmq.client.exception.mqclientexception: 

        send [3] times, still failed, cost [3180]ms, topic: topictest, brokerssent: \

        [win-93cgo0s5g25, win-93cgo0s5g25, win-93cgo0s5g25]

解決方式兩種

1.producer.setvipchannelenabled(false); 生產(chǎn)者和消費(fèi)者添加這行代買(mǎi)。

2.降rocketmq版本,降成3.2.6

關(guān)于spring.rocketmq.name-server的坑

看下圖:

springBoot整合RocketMQ及坑的示例代碼

注意:

如果你是springboot2.0+的框架,或者是jdk10。

你需要將你自己的項(xiàng)目配置文件中的,spring.rocketmq.name-server改成

spring.rocketmq.nameserver。注意是nameserver。

不然就會(huì)報(bào)各種稀奇古怪的bug。

關(guān)于啟動(dòng)報(bào)內(nèi)存不足的錯(cuò)

在安裝啟動(dòng)name server和broker的時(shí)候,一定要修改配置文件,不然內(nèi)存會(huì)爆炸。

native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory 

springBoot整合RocketMQ及坑的示例代碼

將下面的配置文件根據(jù)你的需要改

我這里以前默認(rèn)是xms4g,都是g,我修改到m就行了。

java_opt="${java_opt} -server -xms256m -xmx256m -xmn128m -xx:metaspacesize=128m -xx:maxmetaspacesize=320m"

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。

原文鏈接:https://blog.csdn.net/qq_24853627/article/details/79443437

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 91人人看| 国产精品精品 | www.久| 免费在线看a | 在线成人国产 | 亚洲成人自拍 | 日韩三区视频 | 午夜日韩 | 久久久久久久久久久影视 | 久久精品中文 | 久久99精品久久久久久园产越南 | 国产天堂| 欧美成人精品在线视频 | 日韩成人精品 | 一区二区精品视频 | 日本 欧美 国产 | 99久久影院 | 国产精品久久久久久久久免费桃花 | 欧美一区永久视频免费观看 | 人人做人人澡人人爽欧美 | 久久精品无码一区二区三区 | 99re| 欧美电影在线观看网站 | 91精品国产综合久久久久 | 亚洲欧美自拍视频 | 国产欧美日韩在线观看 | 成人黄色片网站 | 日韩一区二区三区视频 | www.99re | 国产精品尤物 | 欧美日韩精品免费 | 中文字幕乱码一区二区三区 | 欧美成人久久久免费播放 | 久久久久久av | 69久久久久久| 91国内精品久久 | 欧美操穴| 亚洲电影在线观看 | 国产在线观看一区 | 国产亚洲精品美女久久久久久久久久 | 99热在线精品免费 |