国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Spring boot集成Kafka+Storm的示例代碼

Spring boot集成Kafka+Storm的示例代碼

2021-03-13 14:17LeeZer Java教程

這篇文章主要介紹了Spring boot集成Kafka+Storm的示例代碼,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

前言

由于業務需求需要把strom與kafka整合到spring boot項目里,實現其他服務輸出日志至kafka訂閱話題,storm實時處理該話題完成數據監控及其他數據統計,但是網上教程較少,今天想寫的就是如何整合storm+kafka 到spring boot,順帶說一說我遇到的坑。

使用工具及環境配置

? 1. java 版本jdk-1.8

? 2. 編譯工具使用idea-2017

? 3. maven作為項目管理

? 4.spring boot-1.5.8.release

需求體現

1.為什么需要整合到spring boot

為了使用spring boot 統一管理各種微服務,及同時避免多個分散配置

2.具體思路及整合原因

? 使用spring boot統一管理kafka、storm、redis等所需要的bean,通過其他服務日志收集至kafka,kafka實時發送日志至storm,在strom bolt時進行相應的處理操作

遇到的問題

? 1.使用spring boot并沒有相關整合storm

? 2.以spring boot啟動方式不知道如何觸發提交topolgy

? 3.提交topology時遇到numbis not client localhost 問題

? 4.storm bolt中無法通過注解獲得實例化bean進行相應的操作

解決思路

在整合之前我們需要知道相應的spring boot 的啟動方式及配置(如果你在閱讀本文時,默認你已經對storm,kafka及spring boot有相關了解及使用)

spring boot 對storm進行整合的例子在網上很少,但是因為有相應的需求,因此我們還是需要整合.

首先導入所需要jar包:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
<dependency>
 <groupid>org.apache.kafka</groupid>
 <artifactid>kafka-clients</artifactid>
 <version>0.10.1.1</version>
 </dependency>
 
 <dependency>
 <groupid>org.springframework.cloud</groupid>
 <artifactid>spring-cloud-starter-stream-kafka</artifactid>
 <exclusions>
 <exclusion>
  <artifactid>zookeeper</artifactid>
  <groupid>org.apache.zookeeper</groupid>
 </exclusion>
 <exclusion>
  <artifactid>spring-boot-actuator</artifactid>
  <groupid>org.springframework.boot</groupid>
 </exclusion>
 <exclusion>
  <artifactid>kafka-clients</artifactid>
  <groupid>org.apache.kafka</groupid>
 </exclusion>
 </exclusions>
 </dependency>
 
 <dependency>
 <groupid>org.springframework.kafka</groupid>
 <artifactid>spring-kafka</artifactid>
 <exclusions>
 <exclusion>
  <artifactid>kafka-clients</artifactid>
  <groupid>org.apache.kafka</groupid>
 </exclusion>
 </exclusions>
 </dependency>
 
 <dependency>
 <groupid>org.springframework.data</groupid>
 <artifactid>spring-data-hadoop</artifactid>
 <version>2.5.0.release</version>
 <exclusions>
 <exclusion>
  <groupid>org.slf4j</groupid>
  <artifactid>slf4j-log4j12</artifactid>
 </exclusion>
 <exclusion>
  <artifactid>commons-logging</artifactid>
  <groupid>commons-logging</groupid>
 </exclusion>
 <exclusion>
  <artifactid>netty</artifactid>
  <groupid>io.netty</groupid>
 </exclusion>
 <exclusion>
  <artifactid>jackson-core-asl</artifactid>
  <groupid>org.codehaus.jackson</groupid>
 </exclusion>
 <exclusion>
  <artifactid>curator-client</artifactid>
  <groupid>org.apache.curator</groupid>
 </exclusion>
 <exclusion>
  <artifactid>jettison</artifactid>
  <groupid>org.codehaus.jettison</groupid>
 </exclusion>
 <exclusion>
  <artifactid>jackson-mapper-asl</artifactid>
  <groupid>org.codehaus.jackson</groupid>
 </exclusion>
 <exclusion>
  <artifactid>jackson-jaxrs</artifactid>
  <groupid>org.codehaus.jackson</groupid>
 </exclusion>
 <exclusion>
  <artifactid>snappy-java</artifactid>
  <groupid>org.xerial.snappy</groupid>
 </exclusion>
 <exclusion>
  <artifactid>jackson-xc</artifactid>
  <groupid>org.codehaus.jackson</groupid>
 </exclusion>
 <exclusion>
  <artifactid>guava</artifactid>
  <groupid>com.google.guava</groupid>
 </exclusion>
 <exclusion>
  <artifactid>hadoop-mapreduce-client-core</artifactid>
  <groupid>org.apache.hadoop</groupid>
 </exclusion>
 <exclusion>
  <artifactid>zookeeper</artifactid>
  <groupid>org.apache.zookeeper</groupid>
 </exclusion>
 <exclusion>
  <artifactid>servlet-api</artifactid>
  <groupid>javax.servlet</groupid>
 </exclusion>
 
 </exclusions>
 </dependency>
 <dependency>
 <groupid>org.apache.zookeeper</groupid>
 <artifactid>zookeeper</artifactid>
 <version>3.4.10</version>
 <exclusions>
 <exclusion>
  <artifactid>slf4j-log4j12</artifactid>
  <groupid>org.slf4j</groupid>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupid>org.apache.hbase</groupid>
 <artifactid>hbase-client</artifactid>
 <version>1.2.4</version>
 <exclusions>
 <exclusion>
  <artifactid>log4j</artifactid>
  <groupid>log4j</groupid>
 </exclusion>
 <exclusion>
  <artifactid>zookeeper</artifactid>
  <groupid>org.apache.zookeeper</groupid>
 </exclusion>
 <exclusion>
  <artifactid>netty</artifactid>
  <groupid>io.netty</groupid>
 </exclusion>
 <exclusion>
  <artifactid>hadoop-common</artifactid>
  <groupid>org.apache.hadoop</groupid>
 </exclusion>
 <exclusion>
  <artifactid>guava</artifactid>
  <groupid>com.google.guava</groupid>
 </exclusion>
 <exclusion>
  <artifactid>hadoop-annotations</artifactid>
  <groupid>org.apache.hadoop</groupid>
 </exclusion>
 <exclusion>
  <artifactid>hadoop-yarn-common</artifactid>
  <groupid>org.apache.hadoop</groupid>
 </exclusion>
 <exclusion>
  <artifactid>slf4j-log4j12</artifactid>
  <groupid>org.slf4j</groupid>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupid>org.apache.hadoop</groupid>
 <artifactid>hadoop-common</artifactid>
 <version>2.7.3</version>
 <exclusions>
 <exclusion>
  <artifactid>commons-logging</artifactid>
  <groupid>commons-logging</groupid>
 </exclusion>
 <exclusion>
  <artifactid>curator-client</artifactid>
  <groupid>org.apache.curator</groupid>
 </exclusion>
 <exclusion>
  <artifactid>jackson-mapper-asl</artifactid>
  <groupid>org.codehaus.jackson</groupid>
 </exclusion>
 <exclusion>
  <artifactid>jackson-core-asl</artifactid>
  <groupid>org.codehaus.jackson</groupid>
 </exclusion>
 <exclusion>
  <artifactid>log4j</artifactid>
  <groupid>log4j</groupid>
 </exclusion>
 <exclusion>
  <artifactid>snappy-java</artifactid>
  <groupid>org.xerial.snappy</groupid>
 </exclusion>
 <exclusion>
  <artifactid>zookeeper</artifactid>
  <groupid>org.apache.zookeeper</groupid>
 </exclusion>
 <exclusion>
  <artifactid>guava</artifactid>
  <groupid>com.google.guava</groupid>
 </exclusion>
 <exclusion>
  <artifactid>hadoop-auth</artifactid>
  <groupid>org.apache.hadoop</groupid>
 </exclusion>
 <exclusion>
  <artifactid>commons-lang</artifactid>
  <groupid>commons-lang</groupid>
 </exclusion>
 <exclusion>
  <artifactid>slf4j-log4j12</artifactid>
  <groupid>org.slf4j</groupid>
 </exclusion>
 <exclusion>
  <artifactid>servlet-api</artifactid>
  <groupid>javax.servlet</groupid>
 </exclusion>
 </exclusions>
 </dependency>
 <dependency>
 <groupid>org.apache.hadoop</groupid>
 <artifactid>hadoop-mapreduce-examples</artifactid>
 <version>2.7.3</version>
 <exclusions>
 <exclusion>
  <artifactid>commons-logging</artifactid>
  <groupid>commons-logging</groupid>
 </exclusion>
 <exclusion>
  <artifactid>netty</artifactid>
  <groupid>io.netty</groupid>
 </exclusion>
 <exclusion>
  <artifactid>guava</artifactid>
  <groupid>com.google.guava</groupid>
 </exclusion>
 <exclusion>
  <artifactid>log4j</artifactid>
  <groupid>log4j</groupid>
 </exclusion>
 <exclusion>
  <artifactid>servlet-api</artifactid>
  <groupid>javax.servlet</groupid>
 </exclusion>
 </exclusions>
 </dependency>
 
 <!--storm-->
 <dependency>
 <groupid>org.apache.storm</groupid>
 <artifactid>storm-core</artifactid>
 <version>${storm.version}</version>
 <scope>${provided.scope}</scope>
 <exclusions>
 <exclusion>
  <groupid>org.apache.logging.log4j</groupid>
  <artifactid>log4j-slf4j-impl</artifactid>
 </exclusion>
 <exclusion>
  <artifactid>servlet-api</artifactid>
  <groupid>javax.servlet</groupid>
 </exclusion>
 </exclusions>
 </dependency>
 
 <dependency>
 <groupid>org.apache.storm</groupid>
 <artifactid>storm-kafka</artifactid>
 <version>1.1.1</version>
 <exclusions>
 <exclusion>
  <artifactid>kafka-clients</artifactid>
  <groupid>org.apache.kafka</groupid>
 </exclusion>
 </exclusions>
 </dependency>

其中去除jar包是因為需要相與項目構建依賴有多重依賴問題,storm版本為1.1.0  spring boot相關依賴為

```java

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<!-- spring boot -->
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter</artifactid>
   <exclusions>
    <exclusion>
     <groupid>org.springframework.boot</groupid>
     <artifactid>spring-boot-starter-logging</artifactid>
    </exclusion>
   </exclusions>
  </dependency>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-web</artifactid>
  </dependency>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-aop</artifactid>
  </dependency>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-test</artifactid>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-log4j2</artifactid>
  </dependency>
  <dependency>
   <groupid>org.mybatis.spring.boot</groupid>
   <artifactid>mybatis-spring-boot-starter</artifactid>
   <version>${mybatis-spring.version}</version>
  </dependency>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-configuration-processor</artifactid>
   <optional>true</optional>
  </dependency>

ps:maven的jar包僅因為項目使用需求,不是最精簡,僅供大家參考.

項目結構:

config-存儲不同環境配置文件

Spring boot集成Kafka+Storm的示例代碼

存儲構建spring boot 相關實現類 其他如構建名

啟動spring boot的時候我們會發現

其實開始整合前,對storm了解的較少,屬于剛開始沒有接觸過,后面參考發現整合到spring boot里面啟動spring boot之后并沒有相應的方式去觸發提交topolgy的函數,所以也造成了以為啟動spring boot之后就完事了結果等了半個小時什么事情都沒發生才發現沒有實現觸發提交函數.

為了解決這個問題我的想法是: 啟動spring boot->創建kafka監聽topic然后啟動topolgy完成啟動,可是這樣的問題kafka監聽這個主題會重復觸發topolgy,這明顯不是我們想要的.看了一會后發現spring 有相關啟動完成之后執行某個時間方法,這個對我來說簡直是救星啊.所以現在觸發topolgy的思路變為:

啟動spring boot ->執行觸發方法->完成相應的觸發條件

構建方法為:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
 * @author leezer
 * @date 2017/12/28
 * spring加載完后自動自動提交topology
 **/
@configuration
@component
public class autoload implements applicationlistener<contextrefreshedevent> {
 
 private static string brokerzkstr;
 private static string topic;
 private static string host;
 private static string port;
 public autoload(@value("${storm.brokerzkstr}") string brokerzkstr,
     @value("${zookeeper.host}") string host,
     @value("${zookeeper.port}") string port,
     @value("${kafka.default-topic}") string topic
 ){
  brokerzkstr = brokerzkstr;
  host= host;
  topic= topic;
  port= port;
 }
 
 @override
 public void onapplicationevent(contextrefreshedevent event) {
  try {
   //實例化topologybuilder類。
   topologybuilder topologybuilder = new topologybuilder();
   //設置噴發節點并分配并發數,該并發數將會控制該對象在集群中的線程數。
   brokerhosts brokerhosts = new zkhosts(brokerzkstr);
   // 配置kafka訂閱的topic,以及zookeeper中數據節點目錄和名字
   spoutconfig spoutconfig = new spoutconfig(brokerhosts, topic, "/storm", "s32");
   spoutconfig.scheme = new schemeasmultischeme(new stringscheme());
   spoutconfig.zkservers = collections.singletonlist(host);
   spoutconfig.zkport = integer.parseint(port);
   //從kafka最新輸出日志讀取
   spoutconfig.startoffsettime = offsetrequest.latesttime();
   kafkaspout receiver = new kafkaspout(spoutconfig);
   topologybuilder.setspout("kafka-spout", receiver, 1).setnumtasks(2);
   topologybuilder.setbolt("alarm-bolt", new alarmbolt(), 1).setnumtasks(2).shufflegrouping("kafka-spout");
   config config = new config();
   config.setdebug(false);
   /*設置該topology在storm集群中要搶占的資源slot數,一個slot對應這supervisor節點上的以個worker進程,如果你分配的spot數超過了你的物理節點所擁有的worker數目的話,有可能提交不成功,加入你的集群上面已經有了一些topology而現在還剩下2個worker資源,如果你在代碼里分配4個給你的topology的話,那么這個topology可以提交但是提交以后你會發現并沒有運行。 而當你kill掉一些topology后釋放了一些slot后你的這個topology就會恢復正常運行。
   */
   config.setnumworkers(1);
   localcluster cluster = new localcluster();
   cluster.submittopology("kafka-spout", config, topologybuilder.createtopology());
  } catch (exception e) {
   e.printstacktrace();
  }
 }
}

? 注:

啟動項目時因為使用的是內嵌tomcat進行啟動,可能會報如下錯誤

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
[tomcat-startstop-1] error o.a.c.c.containerbase - a child container failed during start
java.util.concurrent.executionexception: org.apache.catalina.lifecycleexception: failed to start component [standardengine[tomcat].standardhost[localhost].tomcatembeddedcontext[]]
 at java.util.concurrent.futuretask.report(futuretask.java:122) ~[?:1.8.0_144]
 at java.util.concurrent.futuretask.get(futuretask.java:192) ~[?:1.8.0_144]
 at org.apache.catalina.core.containerbase.startinternal(containerbase.java:939) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at org.apache.catalina.core.standardhost.startinternal(standardhost.java:872) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at org.apache.catalina.util.lifecyclebase.start(lifecyclebase.java:150) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at org.apache.catalina.core.containerbase$startchild.call(containerbase.java:1419) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at org.apache.catalina.core.containerbase$startchild.call(containerbase.java:1409) [tomcat-embed-core-8.5.23.jar:8.5.23]
 at java.util.concurrent.futuretask.run$$$capture(futuretask.java:266) [?:1.8.0_144]
 at java.util.concurrent.futuretask.run(futuretask.java) [?:1.8.0_144]
 at java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1149) [?:1.8.0_144]
 at java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:624) [?:1.8.0_144]
 at java.lang.thread.run(thread.java:748) [?:1.8.0_144]

這是因為有相應導入的jar包引入了servlet-api版本低于內嵌版本,我們需要做的就是打開maven依賴把其去除

?
1
2
3
4
<exclusion>
 <artifactid>servlet-api</artifactid>
 <groupid>javax.servlet</groupid>
</exclusion>

然后重新啟動就可以了.

啟動過程中還有可能報:

 

復制代碼 代碼如下:

org.apache.storm.utils.nimbusleadernotfoundexception: could not find leader nimbus from seed hosts [localhost]. did you specify a valid list of nimbus hosts for config nimbus.seeds?at org.apache.storm.utils.nimbusclient.getconfiguredclientas(nimbusclient.java:90

 

 

這個問題我思考了很久,發現網上的解釋都是因為storm配置問題導致不對,可是我的storm是部署在服務器上的.并沒有相關的配置,按理也應該去服務器上讀取相關配置,可是結果并不是這樣的。最后嘗試了幾個做法發現都不對,這里才發現,在構建集群的時候storm提供了相應的本地集群

?
1
localcluster cluster = new localcluster();

進行本地測試,如果在本地測試就使用其進行部署測試,如果部署到服務器上需要把:

?
1
2
3
cluster.submittopology("kafka-spout", config, topologybuilder.createtopology());
//修正為:
stormsubmitter.submittopology("kafka-spout", config, topologybuilder.createtopology());

進行任務提交;

以上解決了上面所述的問題1-3

問題4:是在bolt中使用相關bean實例,我發現我把其使用@component加入spring中也無法獲取到實例:我的猜想是在我們構建提交topolgy的時候,它會在:

 

復制代碼 代碼如下:

topologybuilder.setbolt("alarm-bolt",new alarmbolt(),1).setnumtasks(2).shufflegrouping("kafka-spout");

 

 

執行bolt相關:

?
1
2
3
4
5
6
7
@override
 public void prepare(map stormconf, topologycontext context,
      outputcollector collector) {
  this.collector = collector;
  stormlauncher stormlauncher = stormlauncher.getstormlauncher();
  datarepositorys =(alarmdatarepositorys)   stormlauncher.getbean("alarmdatarepositorys");
 }

而不會實例化bolt,導致線程不一而spring 獲取不到.(這里我也不是太明白,如果有大佬知道可以分享一波)

而我們使用spring boot的意義就在于這些獲取這些繁雜的對象,這個問題困擾了我很久.最終想到,我們可以通過上下文getbean獲取實例不知道能不能行,然后我就開始了定義:

例如我需要在bolt中使用一個服務:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * @author leezer
 * @date 2017/12/27
 * 存儲操作失敗時間
 **/
@service("alarmdatarepositorys")
public class alarmdatarepositorys extends redisbase implements ialarmdatarepositorys {
 private static final string erro = "erro";
 /**
  * @param type 類型
  * @param key key值
  * @return 錯誤次數
  **/
 @override
 public string geterrnumfromredis(string type,string key) {
  if(type==null || key == null){
   return null;
  }else {
   valueoperations<string, string> valueoper = primarystringredistemplate.opsforvalue();
   return valueoper.get(string.format("%s:%s:%s",erro,type,key));
  }
 }
 
 /**
  * @param type 錯誤類型
  * @param key key值
  * @param value 存儲值
  **/
 @override
 public void seterrnumtoredis(string type, string key,string value) {
  try {
   valueoperations<string, string> valueoper = primarystringredistemplate.opsforvalue();
   valueoper.set(string.format("%s:%s:%s", erro,type, key), value, dictionaries.apikeydayoflifecycle, timeunit.seconds);
  }catch (exception e){
   logger.info(dictionaries.redis_error_prefix+string.format("key為%s存入redis失敗",key));
  }
 }

這里我指定了該bean的名稱,則在bolt執行prepare時:使用getbean方法獲取了相關bean就能完成相應的操作.

然后kafka訂閱主題發送至我bolt進行相關的處理.而這里getbean的方法是在啟動bootmain函數定義:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@springbootapplication
@enabletransactionmanagement
@componentscan({"service","storm"})
@enablemongorepositories(basepackages = {"storm"})
@propertysource(value = {"classpath:service.properties", "classpath:application.properties","classpath:storm.properties"})
@importresource(locations = {
 "classpath:/configs/spring-hadoop.xml",
 "classpath:/configs/spring-hbase.xml"})
public class stormlauncher extends springbootservletinitializer {
 //設置 安全線程launcher實例
 private volatile static stormlauncher stormlauncher;
 //設置上下文
 private applicationcontext context;
 public static void main(string[] args) {
  springapplicationbuilder application = new springapplicationbuilder(stormlauncher.class);
  // application.web(false).run(args);該方式是spring boot不以web形式啟動
 application.run(args);
 stormlauncher s = new stormlauncher();
 s.setapplicationcontext(application.context());
 setstormlauncher(s);
 }
 
 private static void setstormlauncher(stormlauncher stormlauncher) {
 stormlauncher.stormlauncher = stormlauncher;
 }
 public static stormlauncher getstormlauncher() {
 return stormlauncher;
 }
 
 @override
 protected springapplicationbuilder configure(springapplicationbuilder application) {
 return application.sources(stormlauncher.class);
 }
 
 /**
 * 獲取上下文
 *
 * @return the application context
 */
 public applicationcontext getapplicationcontext() {
 return context;
 }
 
 /**
 * 設置上下文.
 *
 * @param appcontext 上下文
 */
 private void setapplicationcontext(applicationcontext appcontext) {
 this.context = appcontext;
 }
 
 /**
 * 通過自定義name獲取 實例 bean.
 *
 * @param name the name
 * @return the bean
 */
 public object getbean(string name) {
 return context.getbean(name);
 }
 
 /**
 * 通過class獲取bean.
 *
 * @param <t> the type parameter
 * @param clazz the clazz
 * @return the bean
 */
 public <t> t getbean(class<t> clazz) {
 return context.getbean(clazz);
 }
 
 /**
 * 通過name,以及clazz返回指定的bean
 *
 * @param <t> the type parameter
 * @param name the name
 * @param clazz the clazz
 * @return the bean
 */
 public <t> t getbean(string name, class<t> clazz) {
 return context.getbean(name, clazz);
 }

到此集成storm 和kafka至spring boot已經結束了,相關kafka及其他配置我會放入github上面

對了這里還有一個kafkaclient的坑:

async loop died! java.lang.nosuchmethoderror: org.apache.kafka.common.network.networksend.

項目會報kafka client 問題,這是因為storm-kafka中,kafka使用的是0.8版本,而networksend是0.9以上的版本,這里集成需要與你集成的kafka相關版本一致.

雖然集成比較簡單,但是參考都比較少,加之剛開始接觸storm所以思考比較多,也在這記錄一下.

項目地址 - github

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:https://juejin.im/post/5a4755ea51882538d31043be

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 噜噜噜噜噜在线视频 | 久草 在线| 91精品久久久久久久久中文字幕 | 簧片毛片 | 欧美日韩第一页 | 久久久久久一区 | 国产精品成人av | av在线免费观看网站 | 久久久久久久久99精品 | 99热少妇| 久久首页 | 久久久久久久久久久久国产 | 国产日韩欧美精品 | 综合久久亚洲 | 在线精品一区 | 亚洲国产精品久久久 | 日韩欧美一区二区中文字幕 | 午夜av毛片| 国产精品久久久久久久一区探花 | 日韩欧美二区 | 中文字幕精品视频 | 亚洲精品久久久久久下一站 | 毛片免费播放 | 国产98色在线 | 日韩 | 国产精品久久久久久久久大全 | 欧美福利在线 | 久久久99精品免费观看 | 在线看黄网站 | 国产精品欧美一区二区 | 国产精品尤物在线观看 | 精品国偷自产国产一区 | 亚洲一区二区三区在线视频 | 欧美天堂 | 日韩精品色 | 免费观看一级视频 | 日日夜夜精品免费视频 | 久久精品影视 | 国产精品三级久久久久久电影 | 免费在线观看黄色 | 一区二区日韩 | 一本色道久久综合狠狠躁篇怎么玩 |