將nginx日志通過filebeat收集后傳入logstash,經過logstash處理后寫入elasticsearch。filebeat只負責收集工作,logstash完成日志的格式化,數據的替換,拆分 ,以及將日志寫入elasticsearch后的索引的創建。
1、配置nginx日志格式
1
2
3
4
5
6
7
|
log_format main '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request ' '$status $body_bytes_sent $http_referer ' '"$http_user_agent" ' '"$connection" ' '"$http_cookie" ' '$request_time ' '$upstream_response_time' ; |
2、安裝配置filebeat,啟用nginx module
1
2
3
|
tar -zxvf filebeat-6.2.4-linux-x86_64. tar .gz -C /usr/local cd /usr/local ; ln -s filebeat-6.2.4-linux-x86_64 filebeat cd /usr/local/filebeat |
啟用nginx模塊
1
|
. /filebeat modules enable nginx |
查看模塊
1
|
. /filebeat modules list |
創建配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
|
vim /usr/local/filebeat/blog_module_logstash .yml filebeat.modules: - module: nginx access: enabled: true var.paths: [ "/home/weblog/blog.cnfol.com_access.log" ] #error: # enabled: true # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"] output.logstash: hosts: [ "192.168.15.91:5044" ] |
啟動filebeat
1
|
. /filebeat -c blog_module_logstash.yml -e |
3、配置logstash
1
2
3
4
|
tar -zxvf logstash-6.2.4. tar .gz /usr/local cd /usr/local ; ln -s logstash-6.2.4 logstash 創建一個nginx日志的pipline文件 cd /usr/local/logstash |
logstash內置的模板目錄
1
|
vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns |
編輯 grok-patterns 添加一個支持多ip的正則
1
|
FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD} |
官方grok
http://grokdebug.herokuapp.com/patterns#
創建logstash pipline配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#input { # stdin {} #} # 從filebeat接受數據 input { beats { port => 5044 host => "0.0.0.0" } } filter { # 添加一個調試的開關 mutate{add_field => { "[@metadata][debug]" => true }} grok { # 過濾nginx日志 #match => { "message" => "%{NGINXACCESS_TEST2}" } #match => { "message" => '%{IPORHOST:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{HTTPDATE:[@metadata][webtime]}\] # %{NOTSPACE:hostname} # %{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} # %{NUMBER:response} # (?:%{NUMBER:bytes}|-) # (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) # (?:"(?<cookies>[^#]*)") # %{NUMBER:request_time:float} # (?:%{NUMBER:upstream_response_time:float}|-)' } #match => { "message" => '(?:%{IPORHOST:clientip}|-) (?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-) \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) (?:"(?<cookies>[^#]*)") %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' } match => { "message" => '(?:%{IPORHOST:clientip}|-) %{FORWORD:http_x_forwarded_for} \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) %{QS:cookie} %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' } } # 將默認的@timestamp(beats收集日志的時間)的值賦值給新字段@read_tiimestamp ruby { #code => "event.set('@read_timestamp',event.get('@timestamp'))" #將時區改為東8區 code => "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)" } # 將nginx的日志記錄時間格式化 # 格式化時間 20/May/2015:21:05:56 +0000 date { locale => "en" match => [ "[@metadata][webtime]" , "dd/MMM/yyyy:HH:mm:ss Z" ] } # 將bytes字段由字符串轉換為數字 mutate { convert => { "bytes" => "integer" } } # 將cookie字段解析成一個json #mutate { # gsub => ["cookies",'\;',','] #} # 如果有使用到cdn加速http_x_forwarded_for會有多個ip,第一個ip是用戶真實ip if [http_x_forwarded_for] =~ ", " { ruby { code => 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])' } } # 解析ip,獲得ip的地理位置 geoip { source => "http_x_forwarded_for" # # 只獲取ip的經緯度、國家、城市、時區 fields => [ "location" , "country_name" , "city_name" , "region_name" ] } # 將agent字段解析,獲得瀏覽器、系統版本等具體信息 useragent { source => "agent" target => "useragent" } #指定要刪除的數據 #mutate{remove_field=>["message"]} # 根據日志名設置索引名的前綴 ruby { code => 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])' } # 將@timestamp 格式化為2019.04.23 ruby { code => 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%Y.%m.%d"))' } # 設置輸出的默認索引名 mutate { add_field => { #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+YYYY.MM.dd}" "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}" } } # 將cookies字段解析成json # mutate { # gsub => [ # "cookies", ";", ",", # "cookies", "=", ":" # ] # #split => {"cookies" => ","} # } # json_encode { # source => "cookies" # target => "cookies_json" # } # mutate { # gsub => [ # "cookies_json", ',', '","', # "cookies_json", ':', '":"' # ] # } # json { # source => "cookies_json" # target => "cookies2" # } # 如果grok解析存在錯誤,將錯誤獨立寫入一個索引 if "_grokparsefailure" in [tags] { #if "_dateparsefailure" in [tags] { mutate { replace => { #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}" "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}" } } # 如果不存在錯誤就刪除message } else { mutate{remove_field=>[ "message" ]} } } output { if [@metadata][debug]{ # 輸出到rubydebuyg并輸出metadata stdout{codec => rubydebug{metadata => true }} } else { # 將輸出內容轉換成 "." stdout{codec => dots} # 將輸出到指定的es elasticsearch { hosts => [ "192.168.15.160:9200" ] index => "%{[@metadata][index]}" document_type => "doc" } } } |
啟動logstash
1
|
nohup bin /logstash -f test_pipline2.conf & |
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:http://www.zhengdazhi.com/archives/1744