国产片侵犯亲女视频播放_亚洲精品二区_在线免费国产视频_欧美精品一区二区三区在线_少妇久久久_在线观看av不卡

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

云服務(wù)器|WEB服務(wù)器|FTP服務(wù)器|郵件服務(wù)器|虛擬主機|服務(wù)器安全|DNS服務(wù)器|服務(wù)器知識|Nginx|IIS|Tomcat|

服務(wù)器之家 - 服務(wù)器技術(shù) - Nginx - nginx lua集成kafka的實現(xiàn)方法

nginx lua集成kafka的實現(xiàn)方法

2020-01-10 18:01-小魚- Nginx

這篇文章主要介紹了nginx lua集成kafka的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

第一步:進入opresty目錄

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@node03 openresty]# cd /export/servers/openresty/
[root@node03 openresty]# ll
total 356
drwxr-xr-x 2 root root  4096 Jul 26 11:33 bin
drwxrwxr-x 44 1000 1000  4096 Jul 26 11:31 build
drwxrwxr-x 43 1000 1000  4096 Nov 13 2017 bundle
-rwxrwxr-x 1 1000 1000 45908 Nov 13 2017 configure
-rw-rw-r-- 1 1000 1000 22924 Nov 13 2017 COPYRIGHT
drwxr-xr-x 6 root root  4096 Jul 26 11:33 luajit
drwxr-xr-x 6 root root  4096 Aug 1 08:14 lualib
-rw-r--r-- 1 root root  5413 Jul 26 11:32 Makefile
drwxr-xr-x 11 root root  4096 Jul 26 11:35 nginx
drwxrwxr-x 2 1000 1000  4096 Nov 13 2017 patches
drwxr-xr-x 44 root root  4096 Jul 26 11:33 pod
-rw-rw-r-- 1 1000 1000  3689 Nov 13 2017 README.markdown
-rw-rw-r-- 1 1000 1000  8690 Nov 13 2017 README-win32.txt
-rw-r--r-- 1 root root 218352 Jul 26 11:33 resty.index
drwxr-xr-x 5 root root  4096 Jul 26 11:33 site
drwxr-xr-x 2 root root  4096 Aug 1 10:54 testlua
drwxrwxr-x 2 1000 1000  4096 Nov 13 2017 util
[root@node03 openresty]#

說明:接下來我們關(guān)注兩個目錄 lualib 和 nginx

? 1.lualib: 是存放opresty所需要的集成軟件包的

? 2.nginx: 是nginx服務(wù)目錄

接下來,我們進入lualib目錄一看究竟:

?
1
2
3
4
5
6
7
8
[root@node03 openresty]# cd lualib/
[root@node03 lualib]# ll
total 116
-rwxr-xr-x 1 root root 101809 Jul 26 11:33 cjson.so
drwxr-xr-x 3 root root  4096 Jul 26 11:33 ngx
drwxr-xr-x 2 root root  4096 Jul 26 11:33 rds
drwxr-xr-x 2 root root  4096 Jul 26 11:33 redis
drwxr-xr-x 9 root root  4096 Aug 1 10:34 resty

這里我們看到了redis和ngx集成軟件包,說明我們可以之間使用nginx和redis而無需導(dǎo)入任何依賴包!!!!

下面看看resty里面有些說明呢????

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[root@node03 lualib]# cd resty/
[root@node03 resty]# ll
total 152
-rw-r--r-- 1 root root 6409 Jul 26 11:33 aes.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 core
-rw-r--r-- 1 root root  596 Jul 26 11:33 core.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 dns
drwxr-xr-x 2 root root 4096 Aug 1 10:42 kafka  #這是我們自己導(dǎo)入的
drwxr-xr-x 2 root root 4096 Jul 26 11:33 limit
-rw-r--r-- 1 root root 4616 Jul 26 11:33 lock.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 lrucache
-rw-r--r-- 1 root root 4620 Jul 26 11:33 lrucache.lua
-rw-r--r-- 1 root root 1211 Jul 26 11:33 md5.lua
-rw-r--r-- 1 root root 14544 Jul 26 11:33 memcached.lua
-rw-r--r-- 1 root root 21577 Jul 26 11:33 mysql.lua
-rw-r--r-- 1 root root  616 Jul 26 11:33 random.lua
-rw-r--r-- 1 root root 9227 Jul 26 11:33 redis.lua
-rw-r--r-- 1 root root 1192 Jul 26 11:33 sha1.lua
-rw-r--r-- 1 root root 1045 Jul 26 11:33 sha224.lua
-rw-r--r-- 1 root root 1221 Jul 26 11:33 sha256.lua
-rw-r--r-- 1 root root 1045 Jul 26 11:33 sha384.lua
-rw-r--r-- 1 root root 1359 Jul 26 11:33 sha512.lua
-rw-r--r-- 1 root root  236 Jul 26 11:33 sha.lua
-rw-r--r-- 1 root root  698 Jul 26 11:33 string.lua
-rw-r--r-- 1 root root 5178 Jul 26 11:33 upload.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 upstream
drwxr-xr-x 2 root root 406 Jul 26 11:33 websocket

這里我們看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管

注意:這里的 kafka 這個包是沒有的,說明opnresty么有集成kafka。此處我已經(jīng)提前導(dǎo)入啦kafka集成包

我們看看kafka里面多有哪些包:

?
1
2
3
4
5
6
7
8
9
10
11
[root@node03 resty]# cd kafka
[root@node03 kafka]# ll
total 48
-rw-r--r-- 1 root root 1369 Aug 1 10:42 broker.lua
-rw-r--r-- 1 root root 5537 Aug 1 10:42 client.lua
-rw-r--r-- 1 root root  710 Aug 1 10:42 errors.lua
-rw-r--r-- 1 root root 10718 Aug 1 10:42 producer.lua
-rw-r--r-- 1 root root 4072 Aug 1 10:42 request.lua
-rw-r--r-- 1 root root 2118 Aug 1 10:42 response.lua
-rw-r--r-- 1 root root 1494 Aug 1 10:42 ringbuffer.lua
-rw-r--r-- 1 root root 4845 Aug 1 10:42 sendbuffer.lua

附上 kafka 集成包:kafka.rar

第二步:創(chuàng)建kafka測試lua文件

1.退回到openresty

?
1
[root@node03 kafka]# cd /export/servers/openresty/

2.創(chuàng)建測試文件

?
1
2
[root@node03 openresty]# mkdir -r testlua
#這里文件名自己取,文件位置自己定,但必須找得到

這里文件名自己取,文件位置自己定,但必須找得到!!!!!!!!!!!下面會用到!!!!!!!!!!

3.進入剛剛創(chuàng)建的文件夾并創(chuàng)建kafkalua.lua腳本文件

創(chuàng)建文件:vim kafkalua.lua或者touch kafkalua.lua

?
1
2
3
4
[root@node03 openresty]# cd testlua/
[root@node03 testlua]# ll
total 8
-rw-r--r-- 1 root root 3288 Aug 1 10:54 kafkalua.lua

kafkalua.lua:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
--測試語句可以不用
ngx.say('hello kafka file configuration successful!!!!!!')
 
--數(shù)據(jù)采集閾值限制,如果lua采集超過閾值,則不采集
local DEFAULT_THRESHOLD = 100000
-- kafka分區(qū)數(shù)
local PARTITION_NUM = 6
-- kafka主題名稱
local TOPIC = 'B2CDATA_COLLECTION1'
-- 輪詢器共享變量KEY值
local POLLING_KEY = "POLLING_KEY"
-- kafka集群(定義kafka broker地址,ip需要和kafka的host.name配置一致)
local function partitioner(key, num, correlation_id)
  return tonumber(key)
end
--kafka broker列表
local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}}
--kafka參數(shù),
local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner }
-- 共享內(nèi)存計數(shù)器,用于kafka輪詢使用
local shared_data = ngx.shared.shared_data
local pollingVal = shared_data:get(POLLING_KEY)
if not pollingVal then
  pollingVal = 1
  shared_data:set(POLLING_KEY, pollingVal)
end
--獲取每一條消息的計數(shù)器,對PARTITION_NUM取余數(shù),均衡分區(qū)
local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM)
shared_data:incr(POLLING_KEY, 1)
 
-- 并發(fā)控制
local isGone = true
--獲取ngx.var.connections_active進行過載保護,即如果當(dāng)前活躍連接數(shù)超過閾值進行限流保護
if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then
  isGone = false
end
-- 數(shù)據(jù)采集
if isGone then
 
  local time_local = ngx.var.time_local
  if time_local == nil then
    time_local = ""
  end
 
  local request = ngx.var.request
  if request == nil then
    request = ""
  end
 
  local request_method = ngx.var.request_method
  if request_method == nil then
    request_method = ""
  end
 
  local content_type = ngx.var.content_type
  if content_type == nil then
    content_type = ""
  end
  ngx.req.read_body()
  local request_body = ngx.var.request_body
  if request_body == nil then
    request_body = ""
  end
 
  local http_referer = ngx.var.http_referer
  if http_referer == nil then
    http_referer = ""
  end
 
  local remote_addr = ngx.var.remote_addr
  if remote_addr == nil then
    remote_addr = ""
  end
 
  local http_user_agent = ngx.var.http_user_agent
  if http_user_agent == nil then
    http_user_agent = ""
  end
 
  local time_iso8601 = ngx.var.time_iso8601
  if time_iso8601 == nil then
    time_iso8601 = ""
  end
 
  local server_addr = ngx.var.server_addr
  if server_addr == nil then
    server_addr = ""
  end
 
  local http_cookie = ngx.var.http_cookie
  if http_cookie == nil then
    http_cookie = ""
  end
--封裝數(shù)據(jù)
  local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie;
--引入kafka的producer
local producer = require "resty.kafka.producer"
--創(chuàng)建producer
local bp = producer:new(BROKER_LIST, CONNECT_PARAMS)
--發(fā)送數(shù)據(jù)
local ok, err = bp:send(TOPIC, partitions, message)
--打印錯誤日志
  if not ok then
    ngx.log(ngx.ERR, "kafka send err:", err)
    return
  end
end

第三步:修改nginx配置文件nginx.conf

1.進入ngin/conf目錄

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@node03 openresty]# cd /export/servers/openresty/nginx/conf/
[root@node03 conf]# ll
total 76
-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf
-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf.default
-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params
-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params.default
-rw-r--r-- 1 root root 2837 Jul 26 11:33 koi-utf
-rw-r--r-- 1 root root 2223 Jul 26 11:33 koi-win
-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types
-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types.default
-rw-r--r-- 1 root root 3191 Aug 1 10:52 nginx.conf
-rw-r--r-- 1 root root 2656 Jul 26 11:33 nginx.conf.default
-rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params
-rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params.default
-rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params
-rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params.default
-rw-r--r-- 1 root root 3610 Jul 26 11:33 win-utf

2.修改nginx.conf

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
[root@node03 conf]# vim nginx.conf
 
    #1.說明找到第一個server
    #2.在server上面添加兩行代碼如下
    #3.在server里面添加kafka相關(guān)的代碼如下
    
    
#------------------添加的代碼---------------------------------------
 #開啟共享字典,設(shè)置內(nèi)存大小為10M,供每個nginx的線程消費
 lua_shared_dict shared_data 10m;
 #配置本地域名解析
 resolver 127.0.0.1;
#------------------添加的代碼---------------------------------------
 
 server {
    listen    80;
    server_name localhost;
 
    #charset koi8-r;
 
    #access_log logs/host.access.log main;
    location / {
      root  html;
      index index.html index.htm;
    }
 
    #------------------添加的代碼---------------------------------------
    location /kafkalua { #這里的kafkalua就是工程名字,不加默認(rèn)為空
      #開啟nginx監(jiān)控
      stub_status on;
      #加載lua文件
      default_type text/html;
      #指定kafka的lua文件位置,就是我們剛才創(chuàng)建的kafkalua.lua(前面已經(jīng)強調(diào)要記住的!!!!)
      content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
    }
    #------------------添加的代碼---------------------------------------
}

說明:location /kafkalua{...}這里的kafkalua是工程名,可以隨意取也可以不取,但是必須要記住!!!

看到我們上面配置了兩個location,第一個為location /{...}第二個為location /kafkalua{...}那么他們有什么區(qū)別呢???先向下看,迷霧將會慢慢揭開。

第四步:啟動nginx

1.進入nginx/sbin

?
1
2
3
4
[root@node03 sbin]# cd /export/servers/openresty/nginx/sbin/
[root@node03 sbin]# ll
total 16356
-rwxr-xr-x 1 root root 16745834 Jul 26 11:33 nginx

2.測試配置文件是否正確

?
1
2
3
4
[root@node03 sbin]# nginx -t
nginx: the configuration file /export/servers/openresty/nginx/conf/nginx.conf syntax is ok
nginx: configuration file /export/servers/openresty/nginx/conf/nginx.conf test is successful
#看到已經(jīng)成功啦

3.啟動nginx

?
1
2
[root@node03 sbin]# nginx
#不顯示任何東西一般是成功啦

4.查看nginx是否啟動成功

?
1
2
3
4
5
6
[root@node03 sbin]# ps -ef | grep nginx
root    3730   1 0 09:24 ?    00:00:00 nginx: master process nginx
nobody   3731  3730 0 09:24 ?    00:00:20 nginx: worker process is shutting down
nobody   5766  3730 0 12:17 ?    00:00:00 nginx: worker process
root    5824  3708 0 12:24 pts/1  00:00:00 grep nginx
<span class="hljs-comment">#看到有兩個nginx進程,表示成功le</span>

5.瀏覽器訪問nginx

在瀏覽器輸入:node03/kafkalua

說明:如何么有配置hosts則輸入openresty所在設(shè)備的地址如:192.168.52.120/kafkalua

nginx lua集成kafka的實現(xiàn)方法

在瀏覽器輸入:node03/或者 192.168.52.120/

nginx lua集成kafka的實現(xiàn)方法

再在瀏覽器輸入:node03:80/kafkalua 和 node03:80/試試 搬來nginx.conf來看看:

node03:80/kafkalua 這里的nide03是服務(wù)器的別名或者之間寫文服務(wù)器地址,80是【listen 80;】配置的監(jiān)聽端口,80端口可以省略不寫,如果這寫成【listen 8088;】那么瀏覽器需輸入 node03:8088/kafkalua (這里不能省略8088),kafkalua是工程名。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
server {
   listen    80;
   server_name localhost;
 
   #charset koi8-r;
 
   #access_log logs/host.access.log main;
   location / {
     root  html;
     index index.html index.htm;
   }
 
   #------------------添加的代碼---------------------------------------
   location /kafkalua { #這里的kafkalua就是工程名字,不加默認(rèn)為空
     #開啟nginx監(jiān)控
     stub_status on;
     #加載lua文件
     default_type text/html;
     #指定kafka的lua文件位置,就是我們剛才創(chuàng)建的kafkalua.lua(前面已經(jīng)強調(diào)要記住的!!!!)
     content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
   }

第五步:創(chuàng)建測試爬蟲程序

1.創(chuàng)建maven工程導(dǎo)入依賴

?
1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
   <dependency>
     <groupId>org.jsoup</groupId>
     <artifactId>jsoup</artifactId>
     <version>1.11.3</version>
   </dependency>
   <dependency>
     <groupId>org.apache.httpcomponents</groupId>
     <artifactId>httpclient</artifactId>
     <version>4.5.4</version>
   </dependency>
 </dependencies>

2.偽爬蟲程序

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
public class SpiderGoAirCN {
  private static String basePath = "http://node03/kafkalua";
  public static void main(String[] args) throws Exception {
    for (int i = 0; i < 50000; i++) {
      // 請求查詢信息
      spiderQueryao();
      // 請求html
      spiderHtml();
      // 請求js
      spiderJs();
      // 請求css
      spiderCss();
      // 請求png
      spiderPng();
      // 請求jpg
      spiderJpg();
      Thread.sleep(100);
    }
  }
 
  /**
   *
   * @throws Exception
   */
  public static void spiderQueryao() throws Exception {
    // 1.指定目標(biāo)網(wǎng)站   ^.*/B2C40/query/jaxb/direct/query.ao.*$
    String url = basePath + "/B2C40/query/jaxb/direct/query.ao";
    // 2.發(fā)起請求
    HttpPost httpPost = new HttpPost(url);
    // 3. 設(shè)置請求參數(shù)
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
          "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1="
            + getGoTime() + "&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.80");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "243.45.78.132");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D"
            + getGoTime()
            + "%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1("
            + getGoTime() + ")");
    // 4.設(shè)置請求參數(shù)
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 發(fā)起請求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.獲取返回值
    System.out.println(response != null);
  }
 
  public static void spiderHtml() throws Exception {
    // 1.指定目標(biāo)網(wǎng)站     ^.*html.*$
    String url = basePath + "/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=CTU&d1=2018-01-17&at=1&ct=0&it=0";
    // 2.發(fā)起請求
    HttpPost httpPost = new HttpPost(url);
    // 3. 設(shè)置請求參數(shù)
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.設(shè)置請求參數(shù)
    // httpPost.setEntity(new StringEntity(
    // "depcity=CAN&arrcity=WUH&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preUrl=&isMember="));
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 發(fā)起請求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.獲取返回值
    System.out.println(response != null);
  }
 
  public static void spiderJs() throws Exception {
 
    // 1.指定目標(biāo)網(wǎng)站
    String url = basePath +"/B2C40/dist/main/modules/common/requireConfig.js";
    // 2.發(fā)起請求
    HttpPost httpPost = new HttpPost(url);
    // 3. 設(shè)置請求參數(shù)
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.設(shè)置請求參數(shù)
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 發(fā)起請求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.獲取返回值
    System.out.println(response != null);
  }
 
  public static void spiderCss() throws Exception {
 
    // 1.指定目標(biāo)網(wǎng)站
    String url = basePath +"/B2C40/dist/main/css/flight.css";
    // 2.發(fā)起請求
    HttpPost httpPost = new HttpPost(url);
    // 3. 設(shè)置請求參數(shù)
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader("Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.設(shè)置請求參數(shù)
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 發(fā)起請求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.獲取返回值
    System.out.println(response != null);
  }
 
  public static void spiderPng() throws Exception {
 
    // 1.指定目標(biāo)網(wǎng)站
    String url =basePath + "/B2C40/dist/main/images/common.png";
    // 2.發(fā)起請求
    HttpPost httpPost = new HttpPost(url);
    // 3. 設(shè)置請求參數(shù)
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.設(shè)置請求參數(shù)
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 發(fā)起請求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.獲取返回值
    System.out.println(response != null);
  }
 
  public static void spiderJpg() throws Exception {
 
    // 1.指定目標(biāo)網(wǎng)站
    String url = basePath +"/B2C40/dist/main/images/loadingimg.jpg";
    // 2.發(fā)起請求
    HttpPost httpPost = new HttpPost(url);
    // 3. 設(shè)置請求參數(shù)
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.設(shè)置請求參數(shù)
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 發(fā)起請求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.獲取返回值
    System.out.println(response != null);
  }
 
  public static String getLocalDateTime() {
    DateFormat df = new SimpleDateFormat("dd/MMM/yyyy'T'HH:mm:ss +08:00",
        Locale.ENGLISH);
    String nowAsISO = df.format(new Date());
    return nowAsISO;
 
  }
 
  public static String getISO8601Timestamp() {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss+08:00");
    String nowAsISO = df.format(new Date());
    return nowAsISO;
  }
 
  public static String getGoTime() {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
    String nowAsISO = df.format(new Date());
    return nowAsISO;
  }
 
  public static String getBackTime() {
    Date date = new Date();// 取時間
    Calendar calendar = new GregorianCalendar();
    calendar.setTime(date);
    calendar.add(calendar.DATE, +1);// 把日期往前減少一天,若想把日期向后推一天則將負(fù)數(shù)改為正數(shù)
    date = calendar.getTime();
    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
    String dateString = formatter.format(date);
    return dateString;
  }
}

第六步:啟動kafka

1.創(chuàng)建主題topic

?
1
2
[root@node01 bin]# kafka-topics.sh --zookeeper node01:2181 --partitions 3
--replication-factor 3 --create --topic B2CDATA_COLLECTION1

2.開啟kafka消費者

?
1
2
[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092
--topic B2CDATA_COLLECTION1

第七步:開啟爬蟲程序并觀察結(jié)果

1.啟動爬蟲程序

2.觀察消費者窗口如下

nginx lua集成kafka的實現(xiàn)方法

第八步:啟動kafka-manager觀察

1.啟動kafka-manager

?
1
2
3
4
5
6
7
8
9
10
11
[root@node01 conf]# cd /export/servers/kafka-manager-1.3.3.23/bin/
[root@node01 bin]# ll
total 36
-rwxr-xr-x 1 root root 13747 May 1 06:27 kafka-manager
-rw-r--r-- 1 root root 9975 May 1 06:27 kafka-manager.bat
-rwxr-xr-x 1 root root 1383 May 1 06:27 log-config
-rw-r--r-- 1 root root  105 May 1 06:27 log-config.bat
[root@node01 bin]#
 
#啟動
[root@node01 bin]# ./kafka-manager

啟動后的窗口:

nginx lua集成kafka的實現(xiàn)方法

2.瀏覽器訪問

瀏覽器輸入:node01:9000

nginx lua集成kafka的實現(xiàn)方法

kafka manager使用不做講解,觀察B2CDATA_COLLECTION1主題消費情況:

? 有三個分區(qū),每個分區(qū)消費的消息差多說明成功啦,

? 如果不一樣,則是kafkalua.lua 腳本中沒有配置分區(qū)策略,默認(rèn)分區(qū)會導(dǎo)致 數(shù)據(jù)傾斜 我們需配置自己的分區(qū)策略!

nginx lua集成kafka的實現(xiàn)方法

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。

原文鏈接:https://www.cnblogs.com/-xiaoyu-/p/11294905.html

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25 Weibo Article 26 Weibo Article 27 Weibo Article 28 Weibo Article 29 Weibo Article 30 Weibo Article 31 Weibo Article 32 Weibo Article 33 Weibo Article 34 Weibo Article 35 Weibo Article 36 Weibo Article 37 Weibo Article 38 Weibo Article 39 Weibo Article 40
主站蜘蛛池模板: 亚洲a网| 不卡一区二区三区四区 | 成人精品一区二区三区 | www.伊人网 | 日韩免费 | 一大道一二三区不卡 | 国产日产精品一区二区三区四区 | 午夜av电影 | 日韩理伦片在线观看视频播放 | 神马影院一区二区三区 | 中文字幕av在线 | 日本精品在线观看 | 亚洲欧美日韩精品 | 日韩欧美国产一区二区三区 | 黄色片在线 | 亚洲精品在线视频观看 | 国产成人在线一区二区 | 五月婷综合 | 精品欧美一区二区三区久久久 | 国产精品久久久久久久久 | 黄色二区 | 欧美黄色免费网址 | 四虎影视在线观看 | 国产一区网站 | 情一色一乱一欲一区二区 | 国产在线观看一区二区三区 | 一区二区欧美在线 | 亚洲欧洲视频 | 亚洲专区 中文字幕 | 日韩看片| 成人a视频 | 中日韩免费视频 | 成人乱码一区二区三区av | 日韩在线精品强乱中文字幕 | 亚洲综合久久久 | 日韩国产欧美 | 国产精品视频导航 | 91免费在线看 | 成人精品一区二区 | 久久精品中文字幕 | 亚洲综合区 |