前言
玩過Hadoop的小伙伴對MapReduce應該不陌生,MapReduce的強大且靈活,它可以將一個大問題拆分為多個小問題,將各個小問題發送到不同的機器上去處理,所有的機器都完成計算后,再將計算結果合并為一個完整的解決方案,這就是所謂的分布式計算。本文我們就來看看MongoDB中MapReduce的使用。
打算用mongodb mapreduce之前一定要知道的事!!!
mapreduce其實是分批處理數據的,每一百次重新reduce處理,所以到reduce里的數據如果是101條,那就會分2次進入。
這導致的問題就是在reduce中 如果 初始化 var count = 0;
在循環中 count ++,最后輸出的是1???
避免都方法是,把數據存在返回的value里,這個value是會在循環進入reduce的時候重用的。在循環中 count += value.count
就能把之前都100加上了!!!
還有如果只有一條數據,那它不會進入reduce,會直接返回。
下面是具體例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
string map = @ " function() { var view = this; emit(view.activity, {pv: 1}); }" ; string reduce = @ " function(key, values) { var result = {pv: 0}; values.forEach(function(value){ result.pv += value.pv; }); return result; }" ; string finalize = @ " function(key, value){ return value; }" ; |
mapReduce
MongoDB中的MapReduce可以用來實現更復雜的聚合命令,使用MapReduce主要實現兩個函數:map函數和reduce函數,map函數用來生成鍵值對序列,map函數的結果作為reduce函數的參數,reduce函數中再做進一步的統計,比如我的數據集如下:
1
2
3
4
5
|
{"_id" : ObjectId("59fa71d71fd59c3b2cd908d7"),"name" : "魯迅","book" : "吶喊","price" : 38.0,"publisher" : "人民文學出版社"} {"_id" : ObjectId("59fa71d71fd59c3b2cd908d8"),"name" : "曹雪芹","book" : "紅樓夢","price" : 22.0,"publisher" : "人民文學出版社"} {"_id" : ObjectId("59fa71d71fd59c3b2cd908d9"),"name" : "錢鐘書","book" : "宋詩選注","price" : 99.0,"publisher" : "人民文學出版社"} {"_id" : ObjectId("59fa71d71fd59c3b2cd908da"),"name" : "錢鐘書","book" : "談藝錄","price" : 66.0,"publisher" : "三聯書店"} {"_id" : ObjectId("59fa71d71fd59c3b2cd908db"),"name" : "魯迅","book" : "彷徨","price" : 55.0,"publisher" : "花城出版社"} |
假如我想查詢每位作者所出的書的總價,操作如下:
1
2
3
4
5
|
var map= function (){emit( this .name, this .price)} var reduce= function (key,value){ return Array.sum(value)} var options={out: "totalPrice" } db.sang_books.mapReduce(map,reduce,options); db.totalPrice.find() |
emit函數主要用來實現分組,接收兩個參數,第一個參數表示分組的字段,第二個參數表示要統計的數據,reduce來做具體的數據處理操作,接收兩個參數,對應emit方法的兩個參數,這里使用了Array中的sum函數對price字段進行自加處理,options中定義了將結果輸出的集合,屆時我們將在這個集合中去查詢數據,默認情況下,這個集合即使在數據庫重啟后也會保留,并且保留集合中的數據。
查詢結果如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
{ "_id" : "曹雪芹" , "value" : 22.0 } { "_id" : "錢鐘書" , "value" : 165.0 } { "_id" : "魯迅" , "value" : 93.0 } |
再比如我想查詢每位作者出了幾本書,如下:
1
2
3
4
5
|
var map= function (){emit( this .name,1)} var reduce= function (key,value){ return Array.sum(value)} var options={out: "bookNum" } db.sang_books.mapReduce(map,reduce,options); db.bookNum.find() |
查詢結果如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
{ "_id" : "曹雪芹", "value" : 1.0 } { "_id" : "錢鐘書", "value" : 2.0 } { "_id" : "魯迅", "value" : 2.0 } |
將每位作者的書列出來,如下:
1
2
3
4
5
|
var map= function (){emit( this .name, this .book)} var reduce= function (key,value){ return value.join( ',' )} var options={out: "books" } db.sang_books.mapReduce(map,reduce,options); db.books.find() |
結果如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
{ "_id" : "曹雪芹", "value" : "紅樓夢" } { "_id" : "錢鐘書", "value" : "宋詩選注,談藝錄" } { "_id" : "魯迅", "value" : "吶喊,彷徨" } |
比如查詢每個人售價在¥40以上的書:
1
2
3
4
5
|
var map= function (){emit( this .name, this .book)} var reduce= function (key,value){ return value.join( ',' )} var options={query:{price:{$gt:40}},out: "books" } db.sang_books.mapReduce(map,reduce,options); db.books.find() |
query表示對查到的集合再進行篩選。
結果如下:
1
2
3
4
5
6
7
8
|
{ "_id" : "錢鐘書", "value" : "宋詩選注,談藝錄" } { "_id" : "魯迅", "value" : "彷徨" } |
runCommand實現
我們也可以利用runCommand命令來執行MapReduce。格式如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
db.runCommand( { mapReduce: <collection>, map: < function >, reduce: < function >, finalize: < function >, out: <output>, query: <document>, sort: <document>, limit: <number>, scope: <document>, jsMode: <boolean>, verbose: <boolean>, bypassDocumentValidation: <boolean>, collation: <document> } ) |
含義如下:
參數 | 含義 |
---|---|
mapReduce | 表示要操作的集合 |
map | map函數 |
reduce | reduce函數 |
finalize | 最終處理函數 |
out | 輸出的集合 |
query | 對結果進行過濾 |
sort | 對結果排序 |
limit | 返回的結果數 |
scope | 設置參數值,在這里設置的值在map、reduce、finalize函數中可見 |
jsMode | 是否將map執行的中間數據由javascript對象轉換成BSON對象,默認為false |
verbose | 是否顯示詳細的時間統計信息 |
bypassDocumentValidation | 是否繞過文檔驗證 |
collation | 其他一些校對 |
如下操作,表示執行MapReduce操作并對統計的集合限制返回條數,限制返回條數之后再進行統計操作,如下:
1
2
3
4
|
var map= function (){emit( this .name, this .book)} var reduce= function (key,value){ return value.join( ',' )} db.runCommand({mapreduce: 'sang_books' ,map,reduce,out: "books" ,limit:4,verbose: true }) db.books.find() |
執行結果如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
{ "_id" : "曹雪芹", "value" : "紅樓夢" } { "_id" : "錢鐘書", "value" : "宋詩選注,談藝錄" } { "_id" : "魯迅", "value" : "吶喊" } |
小伙伴們看到,魯迅有一本書不見了,就是因為limit是先限制集合返回條數,然后再執行統計操作。
finalize操作表示最終處理函數,如下:
1
2
3
4
5
|
var f1 = function (key,reduceValue){ var obj={};obj.author=key;obj.books=reduceValue; return obj} var map= function (){emit( this .name, this .book)} var reduce= function (key,value){ return value.join( ',' )} db.runCommand({mapreduce: 'sang_books' ,map,reduce,out: "books" ,finalize:f1}) db.books.find() |
f1第一個參數key表示emit中的第一個參數,第二個參數表示reduce的執行結果,我們可以在f1中對這個結果進行再處理,結果如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
{ "_id" : "曹雪芹", "value" : { "author" : "曹雪芹", "books" : "紅樓夢" } } { "_id" : "錢鐘書", "value" : { "author" : "錢鐘書", "books" : "宋詩選注,談藝錄" } } { "_id" : "魯迅", "value" : { "author" : "魯迅", "books" : "吶喊,彷徨" } } |
scope則可以用來定義一個在map、reduce和finalize中都可見的變量,如下:
1
2
3
4
5
|
var f1 = function (key,reduceValue){ var obj={};obj.author=key;obj.books=reduceValue;obj.sang=sang; return obj} var map= function (){emit( this .name, this .book)} var reduce= function (key,value){ return value.join( ',--' +sang+ '--,' )} db.runCommand({mapreduce: 'sang_books' ,map,reduce,out: "books" ,finalize:f1,scope:{sang: "haha" }}) db.books.find() |
執行結果如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
{ "_id" : "曹雪芹", "value" : { "author" : "曹雪芹", "books" : "紅樓夢", "sang" : "haha" } } { "_id" : "錢鐘書", "value" : { "author" : "錢鐘書", "books" : "宋詩選注,--haha--,談藝錄", "sang" : "haha" } } { "_id" : "魯迅", "value" : { "author" : "魯迅", "books" : "吶喊,--haha--,彷徨", "sang" : "haha" } } |
好了,MongoDB中的MapReduce我們就先說到這里,小伙伴們有問題歡迎留言討論。
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。
參考資料:
1.《MongoDB權威指南第2版》
3.mongoDB--mapreduce用法詳解(未找到原始出處)
原文鏈接:https://segmentfault.com/a/1190000012319667