国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

香港云服务器
服務器之家 - 編程語言 - Java教程 - 在springboot中對kafka進行讀寫的示例代碼

在springboot中對kafka進行讀寫的示例代碼

2020-12-29 14:48冬天里的懶喵 Java教程

本篇文章主要介紹了在springboot中對kafka進行讀寫的示例代碼,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

springbootkafka的client很好的實現了集成,使用非常方便,本文也實現了一個在springboot中實現操作kafka的demo。

1.pom配置

只需要在dependencies中增加 spring-kafka的配置即可。完整效果如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<parent>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-parent</artifactid>
    <version>1.5.4.release</version>
  </parent>
 
  <properties>
    <java.version>1.8</java.version>
     <spring-kafka.version>1.2.2.release</spring-kafka.version>
    <project.build.sourceencoding>utf-8</project.build.sourceencoding>
  </properties>
 
  <dependencies>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-web</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-test</artifactid>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-aop</artifactid>
    </dependency>
   <!-- spring-kafka -->
      <dependency>
      <groupid>org.springframework.kafka</groupid>
      <artifactid>spring-kafka</artifactid>
      <version>${spring-kafka.version}</version>
      </dependency>
      <dependency>
      <groupid>org.springframework.kafka</groupid>
      <artifactid>spring-kafka-test</artifactid>
      <version>${spring-kafka.version}</version>
      <scope>test</scope>
      </dependency>
   </dependencies>

2.生產者

參數配置類,其參數卸載yml文件中,通過@value注入

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.dhb.kafka.producer;
 
import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.common.serialization.stringserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.core.defaultkafkaproducerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.core.producerfactory;
 
import java.util.hashmap;
import java.util.map;
 
@configuration
public class senderconfig {
 
  @value("${kafka.bootstrap-servers}")
  private string bootstrapservers;
 
  @bean
  public map<string,object> producerconfigs() {
    map<string,object> props = new hashmap<>();
    props.put(producerconfig.bootstrap_servers_config,this.bootstrapservers);
    props.put(producerconfig.key_serializer_class_config, stringserializer.class);
    props.put(producerconfig.value_serializer_class_config,stringserializer.class);
    props.put(producerconfig.acks_config,"0");
    return props;
  }
 
  @bean
  public producerfactory<string,string> producerfactory() {
    return new defaultkafkaproducerfactory<>(producerconfigs());
  }
 
  @bean
  public kafkatemplate<string,string> kafkatemplate() {
    return new kafkatemplate<string, string>(producerfactory());
  }
 
  @bean
  public sender sender() {
    return new sender();
  }
}

消息發送類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.dhb.kafka.producer;
 
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.core.kafkatemplate;
 
@slf4j
public class sender {
 
  @autowired
  private kafkatemplate<string,string> kafkatemplate;
 
  public void send(string topic,string payload) {
    log.info("sending payload='{}' to topic='{}'",payload,topic);
    this.kafkatemplate.send(topic,payload);
  }
}

3.消費者

參數配置類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.dhb.kafka.consumer;
 
import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.common.serialization.stringdeserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.annotation.enablekafka;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.core.consumerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
 
import java.util.hashmap;
import java.util.map;
 
@configuration
@enablekafka
public class receiverconfig {
 
  @value("${kafka.bootstrap-servers}")
  private string bootstrapservers;
 
  public map<string,object> consumerconfigs() {
    map<string,object> props = new hashmap<>();
    props.put(consumerconfig.bootstrap_servers_config,bootstrapservers);
    props.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
    props.put(consumerconfig.value_deserializer_class_config,stringdeserializer.class);
    props.put(consumerconfig.group_id_config,"helloword");
    return props;
  }
 
  @bean
  public consumerfactory<string,string> consumerfactory() {
    return new defaultkafkaconsumerfactory<>(consumerconfigs());
  }
 
  @bean
  public concurrentkafkalistenercontainerfactory<string,string> kafkalistenercontainerfactory() {
    concurrentkafkalistenercontainerfactory<string,string> factory =
        new concurrentkafkalistenercontainerfactory<>();
    factory.setconsumerfactory(consumerfactory());
    return factory;
  }
 
  @bean
  public receiver receiver() {
    return new receiver();
  }
 
}

消息接受類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.dhb.kafka.consumer;
 
import lombok.extern.slf4j.slf4j;
import org.springframework.kafka.annotation.kafkalistener;
 
import java.util.concurrent.countdownlatch;
 
@slf4j
public class receiver {
 
  private countdownlatch latch = new countdownlatch(1);
 
  public countdownlatch getlatch() {
    return latch;
  }
 
  @kafkalistener(topics = "${kafka.topic.helloworld}")
  public void receive(string payload) {
    log.info("received payload='{}'",payload);
    latch.countdown();
  }
}

3.web測試類

定義了一個基于http的web測試接口

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.dhb.kafka.web;
 
import com.dhb.kafka.producer.sender;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.requestmethod;
import org.springframework.web.bind.annotation.restcontroller;
 
import javax.servlet.http.httpservletrequest;
import javax.servlet.http.httpservletresponse;
import java.io.ioexception;
 
@restcontroller
@slf4j
public class kafkaproducer {
 
  @autowired
  sender sender;
 
  @requestmapping(value = "/sender.action", method = requestmethod.post)
  public void exec(httpservletrequest request, httpservletresponse response,string data) throws ioexception{
    this.sender.send("testtopic",data);
    response.setcharacterencoding("utf-8");
    response.setcontenttype("text/json");
    response.getwriter().write("success");
    response.getwriter().flush();
    response.getwriter().close();
  }
 
}

4.啟動類及配置

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.dhb.kafka;
 
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
 
@springbootapplication
public class kafkaapplication {
 
 
  public static void main(string[] args) {
    springapplication.run(kafkaapplication.class,args);
 
  }
}

application.yml

?
1
2
3
4
kafka:
 bootstrap-servers: 192.168.162.239:9092
 topic:
  helloworld: testtopic

程序結構:

在springboot中對kafka進行讀寫的示例代碼

包結構

5.讀寫測試

通過執行kafkaapplication的main方法啟動程序。然后打開postman進行測試:

在springboot中對kafka進行讀寫的示例代碼

運行后返回success

在springboot中對kafka進行讀寫的示例代碼

生產者日志:

在springboot中對kafka進行讀寫的示例代碼

消費者日志:

在springboot中對kafka進行讀寫的示例代碼

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://www.jianshu.com/p/3dcb64e49ac5?utm_source=tuicool&utm_medium=referral

延伸 · 閱讀

精彩推薦
1341
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 成人国产精品视频 | 国产精品v一区二区三区 | 久久男人免费视频 | 国产成人jvid在线播放 | 久久久91| 成人午夜性a一级毛片免费看 | 亚洲国产精品福利 | 亚洲欧洲一区二区三区 | 国精品一区 | 亚洲国产成人久久 | 日本一区二区高清视频 | 精品久久久久久久中文字幕 | 国产精品久久久久久久久久免费看 | 久久精品国产免费 | 亚洲成人毛片 | 欧美一区二区在线 | 日本福利视频网 | www久久久久久久 | 日韩中文字幕av | 一级片在线播放 | 日韩综合视频在线观看 | 久久三区 | 韩日一区二区三区 | 日韩精品专区在线影院重磅 | 亚洲专区国产精品 | 欧美 亚洲 一区 | 亚洲精品永久免费 | 超级碰在线视频 | 视频一区二区三区中文字幕 | 久久久久综合精品福利啪啪 | 亚洲视频在线不卡 | 中文字幕亚洲一区二区va在线 | 亚洲经典一区 | 国产精品不卡一区二区三区 | 日韩在线观看成人 | 亚洲毛片网站 | 国产精品视频一 | 日韩中文字幕av在线 | 午夜影晥 | 国产日日夜夜操 | 在线国产小视频 |