本文不涉及ElasticSearch具體原理,只記錄如何快速的導入mysql中的數據進行全文檢索。
工作中需要實現一個搜索功能,并且導入現有數據庫數據,組長推薦用ElasticSearch實現,網上翻一通教程,都是比較古老的文章了,無奈只能自己摸索,參考ES的文檔,總算是把服務搭起來了,記錄下,希望有同樣需求的朋友可以少走彎路,能按照這篇教程快速的搭建一個可用的ElasticSearch服務。
ES的搭建
ES搭建有直接下載zip文件,也有docker容器的方式,相對來說,docker更適合我們跑ES服務。可以方便的搭建集群或建立測試環境。這里使用的也是容器方式,首先我們需要一份Dockerfile:
1
2
3
4
5
6
7
8
9
10
|
FROM docker.elastic.co/elasticsearch/elasticsearch-oss:6.0.0 # 提交配置 包括新的elasticsearch.yml 和 keystore.jks文件 COPY --chown=elasticsearch:elasticsearch conf/ /usr/share/elasticsearch/config/ # 安裝ik RUN ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.0.0/elasticsearch-analysis-ik-6.0.0.zip # 安裝readonlyrest RUN ./bin/elasticsearch-plugin install https://github.com/HYY-yu/BezierCurveDemo/raw/master/readonlyrest-1.16.14_es6.0.0.zip USER elasticsearch CMD ./bin/elasticsearch |
這里對上面的操作做一下說明:
- 首先在Dockerfile下的同級目錄中需要建立一個conf文件夾,保存elasticsearch.yml文件(稍后給出)和keystore.jks。(jks是自簽名文件,用于https,如何生成請自行搜索)
- ik是一款很流行的中文分詞庫,使用它來支持中文搜索。
- readonlyrest是一款開源的ES插件,用于用戶管理、安全驗證,土豪可以使用ES自帶的X-pack包,有更完善的安全功能。
elactic配置 elasticsearch.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
cluster.name: "docker-cluster" network.host: 0.0.0.0 # minimum_master_nodes need to be explicitly set when bound on a public IP # set to 1 to allow single node clusters # Details: https://github.com/elastic/elasticsearch/pull/17288 discovery.zen.minimum_master_nodes: 1 # 禁止系統對ES交換內存 bootstrap.memory_lock: true http.type: ssl_netty4 readonlyrest: enable: true ssl: enable: true keystore_file: "server.jks" keystore_pass: server key_pass: server access_control_rules: - name: "Block 1 - ROOT" type: allow groups: ["admin"] - name: "User read only - paper" groups: ["user"] indices: ["paper*"] actions: ["indices:data/read/*"] users: - username: root auth_key_sha256: cb7c98bae153065db931980a13bd45ee3a77cb8f27a7dfee68f686377acc33f1 groups: ["admin"] - username: xiaoming auth_key: xiaoming:xiaoming groups: ["user"] |
這里bootstrap.memory_lock: true是個坑,禁止交換內存這里文檔已經說明了,有的os會在運行時把暫時不用的內存交換到硬盤的一塊區域,然而這種行為會讓ES的資源占用率飆升,甚至讓系統無法響應。
配置文件里已經很明顯了,一個root用戶屬于admin組,而admin有所有權限,xiaoming同學因為在user組,只能訪問paper索引,并且只能讀取,不能操作。更詳細的配置請見:readonlyrest文檔
至此,ES的準備工作算是做完了,docker build -t ESImage:tag 一下,docker run -p 9200:9200 ESImage:Tag跑起來。
如果https://127.0.0.1:9200/返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
{ "name" : "VaKwrIR" , "cluster_name" : "docker-cluster" , "cluster_uuid" : "YsYdOWKvRh2swz907s2m_w" , "version" : { "number" : "6.0.0" , "build_hash" : "8f0685b" , "build_date" : "2017-11-10T18:41:22.859Z" , "build_snapshot" : false , "lucene_version" : "7.0.1" , "minimum_wire_compatibility_version" : "5.6.0" , "minimum_index_compatibility_version" : "5.0.0" }, "tagline" : "You Know, for Search" } |
我們本次教程的主角算是出場了,分享幾個常用的API調戲調試ES用:
{{url}}替換成你本地的ES地址。
- 查看所有插件:{{url}}/_cat/plugins?v
- 查看所有索引:{{url}}/_cat/indices?v
- 對ES進行健康檢查:{{url}}/_cat/health?v
- 查看當前的磁盤占用率:{{url}}/_cat/allocation?v
導入MYSQL數據
這里我使用的是MYSQL數據,其實其它的數據庫也是一樣,關鍵在于如何導入,網上教程會推薦Logstash、Beat、ES的mysql插件進行導入,我也都實驗過,配置繁瑣,文檔稀少,要是數據庫結構復雜一點,導入是個勞心勞神的活計,所以并不推薦。其實ES在各個語言都有對應的API庫,你在語言層面把數據組裝成json,通過API庫發送到ES即可。流程大致如下:
我使用的是Golang的ES庫elastic,其它語言可以去github上自行搜索,操作的方式都是一樣的。
接下來使用一個簡單的數據庫做介紹:
Paper表
id | name |
---|---|
1 | 北京第一小學模擬卷 |
2 | 江西北京通用高考真題 |
Province表
id | name |
---|---|
1 | 北京 |
2 | 江西 |
Paper_Province表
paper_id | province_id |
---|---|
1 | 1 |
2 | 1 |
2 | 2 |
如上,Paper和Province是多對多關系,現在把Paper數據打入ES,,可以按Paper名稱模糊搜索,也可通過Province進行篩選。json數據格式如下:
1
2
3
4
5
6
7
8
9
10
|
{ "id" :1, "name" : "北京第一小學模擬卷" , "provinces" :[ { "id" :1, "name" : "北京" } ] } |
首先準備一份mapping.json文件,這是在ES中數據的存儲結構定義,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
{ "mappings" :{ "docs" :{ "include_in_all" : false , "properties" :{ "id" :{ "type" : "long" }, "name" :{ "type" : "text" , "analyzer" : "ik_max_word" // 使用最大詞分詞器 }, "provinces" :{ "type" : "nested" , "properties" :{ "id" :{ "type" : "integer" }, "name" :{ "type" : "text" , "index" : "false" // 不索引 } } } } } }, "settings" :{ "number_of_shards" :1, "number_of_replicas" :0 } } |
需要注意的是取消_all字段,這個默認的_all會收集所有的存儲字段,實現無條件限制的搜索,缺點是空間占用大。
shard(分片)數我設置為了1,沒有設置replicas(副本),畢竟這不是一個集群,處理的數據也不是很多,如果有大量數據需要處理可以自行設置分片和副本的數量。
首先與ES建立連接,ca.crt與jks自簽名有關。當然,在這里我使用InsecureSkipVerify忽略了證書文件的驗證。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
func InitElasticSearch() { pool := x509.NewCertPool() crt, err0 := ioutil.ReadFile( "conf/ca.crt" ) if err0 != nil { cannotOpenES(err0, "read crt file err" ) return } pool.AppendCertsFromPEM(crt) tr := &http.Transport{ TLSClientConfig: &tls.Config{RootCAs: pool, InsecureSkipVerify: true }, } httpClient := &http.Client{Transport: tr} //后臺構造elasticClient var err error elasticClient, err = elastic.NewClient(elastic.SetURL(MyConfig.ElasticUrl), elastic.SetErrorLog(GetLogger()), elastic.SetGzip( true ), elastic.SetHttpClient(httpClient), elastic.SetSniff( false ), // 集群嗅探,單節點記得關閉。 elastic.SetScheme( "https" ), elastic.SetBasicAuth(MyConfig.ElasticUsername, MyConfig.ElasticPassword)) if err != nil { cannotOpenES(err, "search_client_error" ) return } //elasticClient構造完成 //查詢是否有paper索引 exist, err := elasticClient.IndexExists(MyConfig.ElasticIndexName).Do(context.Background()) if err != nil { cannotOpenES(err, "exist_paper_index_check" ) return } //索引存在且通過完整性檢查則不發送任何數據 if exist { if !isIndexIntegrity(elasticClient) { //刪除當前索引  準備重建 deleteResponse, err := elasticClient.DeleteIndex(MyConfig.ElasticIndexName).Do(context.Background()) if err != nil || !deleteResponse.Acknowledged { cannotOpenES(err, "delete_index_error" ) return } } else { return } } //后臺查詢數據庫,發送數據到elasticsearch中 go fetchDBGetAllPaperAndSendToES() } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
type PaperSearch struct { PaperId int64 `gorm: "primary_key;column:F_paper_id;type:BIGINT(20)" json: "id" ` Name string `gorm: "column:F_name;size:80" json: "name" ` Provinces []Province `gorm: "many2many:t_paper_province;" json: "provinces" ` // 試卷適用的省份 } func fetchDBGetAllPaperAndSendToES() { //fetch paper var allPaper []PaperSearch GetDb().Table( "t_papers" ).Find(&allPaper) //province for i := range allPaper { var allPro []Province GetDb().Table( "t_provinces" ).Joins( "INNER JOIN `t_paper_province` ON `t_paper_province`.`province_F_province_id` = `t_provinces`.`F_province_id`" ). Where( "t_paper_province.paper_F_paper_id = ?" , allPaper[i].PaperId).Find(&allPro) allPaper[i].Provinces = allPro } if len(allPaper) > 0 { //send to es - create index createService := GetElasticSearch().CreateIndex(MyConfig.ElasticIndexName) // 此處的index_default_setting就是上面mapping.json中的內容。 createService.Body(index_default_setting) createResult, err := createService.Do(context.Background()) if err != nil { cannotOpenES(err, "create_paper_index" ) return } if !createResult.Acknowledged || !createResult.ShardsAcknowledged { cannotOpenES(err, "create_paper_index_fail" ) } // - send all paper bulkRequest := GetElasticSearch().Bulk() for i := range allPaper { indexReq := elastic.NewBulkIndexRequest().OpType( "create" ).Index(MyConfig.ElasticIndexName).Type( "docs" ). Id(helper.Int64ToString(allPaper[i].PaperId)). Doc(allPaper[i]) bulkRequest.Add(indexReq) } // Do sends the bulk requests to Elasticsearch bulkResponse, err := bulkRequest.Do(context.Background()) if err != nil { cannotOpenES(err, "insert_docs_error" ) return } // Bulk request actions get cleared if len(bulkResponse.Created()) != len(allPaper) { cannotOpenES(err, "insert_docs_nums_error" ) return } //send success } } |
跑通上面的代碼后,使用{{url}}/_cat/indices?v看看ES中是否出現了新創建的索引,使用{{url}}/papers/_search看看命中了多少文檔,如果文檔數等于你發送過去的數據量,搜索服務就算跑起來了。
搜索
現在就可以通過ProvinceID和q來搜索試卷,默認按照相關度評分排序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
//q 搜索字符串 provinceID 限定省份id limit page 分頁參數 func SearchPaper(q string, provinceId uint, limit int , page int ) (list []PaperSearch, totalPage int , currentPage int , pageIsEnd int , returnErr error) { //不滿足條件,使用數據庫搜索 if !CanUseElasticSearch && !MyConfig.UseElasticSearch { return SearchPaperLocal(q, courseId, gradeId, provinceId, paperTypeId, limit, page) } list = make([]PaperSimple, 0 ) totalPage = 0 currentPage = page pageIsEnd = 0 returnErr = nil client := GetElasticSearch() if client == nil { return SearchPaperLocal(q, courseId, gradeId, provinceId, paperTypeId, limit, page) } //ElasticSearch有問題,使用數據庫搜索 if !isIndexIntegrity(client) { return SearchPaperLocal(q, courseId, gradeId, provinceId, paperTypeId, limit, page) } if !client.IsRunning() { client.Start() } defer client.Stop() q = html.EscapeString(q) boolQuery := elastic.NewBoolQuery() // Paper.name matchQuery := elastic.NewMatchQuery( "name" , q) //省份 if provinceId > 0 && provinceId != DEFAULT_PROVINCE_ALL { proBool := elastic.NewBoolQuery() tpro := elastic.NewTermQuery( "provinces.id" , provinceId) proNest := elastic.NewNestedQuery( "provinces" , proBool.Must(tpro)) boolQuery.Must(proNest) } boolQuery.Must(matchQuery) for _, e := range termQuerys { boolQuery.Must(e) } highligt := elastic.NewHighlight() highligt.Field(ELASTIC_SEARCH_SEARCH_FIELD_NAME) highligt.PreTags(ELASTIC_SEARCH_SEARCH_FIELD_TAG_START) highligt.PostTags(ELASTIC_SEARCH_SEARCH_FIELD_TAG_END) searchResult, err2 := client.Search(MyConfig.ElasticIndexName). Highlight(highligt). Query(boolQuery). From((page - 1 ) * limit). Size(limit). Do(context.Background()) if err2 != nil { // Handle error GetLogger().LogErr( "搜索時出錯 " +err2.Error(), "search_error" ) // Handle error returnErr = errors.New( "搜索時出錯" ) } else { if searchResult.Hits.TotalHits > 0 { // Iterate through results for _, hit := range searchResult.Hits.Hits { var p PaperSearch err := json.Unmarshal(*hit.Source, &p) if err != nil { // Deserialization failed GetLogger().LogErr( "搜索時出錯 " +err.Error(), "search_deserialization_error" ) returnErr = errors.New( "搜索時出錯" ) return } if len(hit.Highlight[ELASTIC_SEARCH_SEARCH_FIELD_NAME]) > 0 { p.Name = hit.Highlight[ELASTIC_SEARCH_SEARCH_FIELD_NAME][ 0 ] } list = append(list, p) } count := searchResult.TotalHits() currentPage = page if count > 0 { totalPage = int (math.Ceil(float64(count) / float64(limit))) } if currentPage >= totalPage { pageIsEnd = 1 } } else { // No hits } } return } |
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://juejin.im/post/5a7bbb176fb9a063475f6cc7