在MongoDB中,有两种方式计算聚合:Pipeline 和 MapReduce。Pipeline查询速度快于MapReduce,可是MapReduce的强大之处在于可以在多台Server上并行执行复杂的聚合逻辑。MongoDB不容许Pipeline的单个聚合操做占用过多的系统内存,若是一个聚合操做消耗20%以上的内存,那么MongoDB直接中止操做,并向客户端输出错误消息。mongodb
一,使用 Pipeline 方式计算聚合数组
Pipeline 方式使用db.collection.aggregate()函数进行聚合运算,运算速度较快,操做简单,可是,Pipeline方式有两个限制:单个聚合操做消耗的内存不能超过20%,聚合操做返回的结果集必须限制在16MB之内。ide
建立示例数据,在集合 foo中插入1000条doc,每一个doc中有三个field:idx,name 和 age。函数
for(i=0;i<10000;i++) { db.foo.insert({"idx":i,name:"user "+i,age:i%90}); }
1,使用$match 管道符过滤collection中doc,使符合条件的doc进入pipeline,可以减小聚合操做消耗的内存,提升聚合的效率。this
db.foo.aggregate({$match:{age:{$lte:25}}})
2,使用$project 管道符,使用doc中的部分field进入下级pipelinespa
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,idx:1,"_id":0}} )
$project 管道符的做用是选择字段,重命名字段,派生字段。 code
2.1 选择字段对象
在$project 管道符中,field:1/0,表示选择/不选择 field;将无用的字段从pipeline中过滤掉,可以减小聚合操做对内存的消耗。blog
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,idx:1,"_id":0}} )
2.2 对字段重命名,产生新的字段排序
引用符$,格式是:"$field",表示引用doc中 field 的值,若是要引用内嵌 doc中的字段,使用 "$field1.filed2",表示引用内嵌文档field1中的字段:field2的值。
示例,新建一个field:preIdx,其值和idx 字段的值是相同的。
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,"preIdx":"$idx",idx:1,"_id":0}} )
2.3 派生字段
在$project中,对字段进行计算,根据doc中的字段值和表达式,派生一个新的字段。
示例,preIdx是根据当前doc的idx 减1 获得的
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project: { age:1, "preIdx":{$subtract:["$idx",1]}, idx:1, "_id":0} } )
在$project 执行算术运算的操做符:+($add),*($multiply),/($divide),%($mod),-($subtract)。
对于字符数据,$substr:[expr,start,length]用于求子字符串;$concat:[expr1,expr2,,,exprn],用于将表达式链接在一块儿;$toLower:expr 和 $toUpper:expr用于返回expr的小写或大写形式。
2.4 分组操做
使用$group将doc按照特定的字段的值进行分组,$group将分组字段的值相同的doc做为一个分组进行聚合计算。若是没有$group 管道符,那么全部doc做为一个分组。对每个分组,都能根据业务逻辑须要计算特定的聚合值。分组操做和排序操做都是非流式的运算符,流式运算符是指:只要有新doc进入,就能够对doc进行处理,而非流式运算符是指:必须等收到全部的文档以后,才能对文档进行处理。分组运算符的处理方式是等接收到全部的doc以后,才能对doc进行分组,而后将各个分组发送给pipeline的下一个运算符进行处理。
示例,按照age进行分组,统计每一个分组中的doc数量
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,"preIdx":{$subtract:["$idx",1]},idx:1,"_id":0}} , {$group:{"_id":"$age",count:{$sum:1}}} )
若是分组字段有多个,按照 age 和 age2 进行分组,这样作仅仅是为了演示,在实际的产品环境中,可使用更多的字段用来分组。
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,"preIdx":{$subtract:["$idx",1]},idx:1,"_id":0}} , {$group:{"_id":{age:"$age",age2:"$age"},count:{$sum:1}}} )
对每一个分组进行聚合运算,count字段是计算每一个分组中doc的数量,idxTotal字段是计算每一个分组中idx字段值的加和,idxMax字段是计算每一个分组中idx字段值的最大值,idxFirst是计算每一个分组中第一个idx 字段的值,不必定是最小的。
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,"preIdx":{$subtract:["$idx",1]},idx:1,"_id":0}} , {$group: { "_id":{age:"$age",age2:"$age"}, count:{$sum:1}, idxTotal:{$sum:"$idx"}}, idxMax:{$max:"$idx"}, idxFirst:{$first:"$idx"} }
} )
2.5,sort操做,limit操做 和 skip操做
对聚合操做的结果进行排序,而后跳过前10个doc,取剩余结果集的前10个doc。
db.foo.aggregate( {$match:{age:{$lte:25}}}, {$project:{age:1,"preIdx":{$subtract:["$idx",1]},idx:1,"_id":0}} , {$group: { "_id":{age:"$age",age2:"$age"}, count:{$sum:1}, idxTotal:{$sum:"$idx"}}, idxMax:{$max:"$idx"}, idxFirst:{$first:"$idx"} } }, {$sort:{age:-1}}, {$skip:10}, {$limit:10} )
二,使用MapReduce 方式计算聚合
MapReduce 可以计算很是复杂的聚合逻辑,很是灵活,可是,MapReduce很是慢,不该该用于实时的数据分析中。MapReduce可以在多台Server上并行执行,每台Server只负责完成一部分wordload,最后将wordload发送到Master Server上合并,计算出最终的结果集,返回客户端。
MapReduce分为两个阶段:Map和Reduce,举个例子说明,有10节车箱,统计这10节车箱中男生和女生的数量。串行方式一节一节车箱的统计,直到统计彻底部车箱中的人数:男50人,女40人。
使用MapReduce方式的思路是:每一个车箱派一我的去统计,每一个人返回一个doc,例如,keyN:{female:num1,male:num2},keyN是车箱编号,在同一时间,有10我的在同时工做,每一个人只完成所有workload的10%,很快,返回10个doc,从Key1到Key10,只须要将这10个doc中 femal 和 male分别加和到一块儿,就是所有车箱的人数:男50人,女40人。
使用MapReduce方式计算聚合,主要分为三步:Map,Shuffle(拼凑)和Reduce,Map和Reduce须要显式定义,shuffle由MongoDB来实现。
使用MapReduce进行聚合运算的最佳方式是聚合运算的结果可以加到一块儿,例如,求最大值/最小值,sum,平均值(转换为计算每台Server的 总和sum1,sum2,,,sumN 与 num1,num2,,numN,平均值avg=(sum1+sum2+,,,+sumN)/(num1+num2+,,+numN))等。
示例,使用MapReduce模拟Count,统计集合中的doc的数量
step1,定义Map函数和reduce函数
对于每一个doc,直接返回key 和 一个doc:{count:1}
map=function (){ for(var key in this) { emit(key,{count:1}); } } reduce=function (key,emits){ total=0; for(var i in emits){ total+=emits[i].count; } return {"count":total}; }
step2,执行MapReduce运算
在集合 foo上执行MapReduce运算,返回mr 对象
mr=db.runCommand( { "mapreduce":"foo", "map":map, "reduce":reduce, out:"Count Doc" })
step3,查看MapReduce计算的结果
db[mr.result].find()
示例2,统计集合foo中不一样age的数量
step1,定义Map 和 Reduce函数
Map函数的做用是对每一个doc进行一次映射,返回age 和 {count:1};
通过Shuffle,每一个age都有一个列表:[{count:1},{count:1},{count:1},{count:1},,,,,],有多少个不一样的age,MongoDB都会调用多少次Reduce函数,每次调用时,Key值是不一样的。
Reduce函数的做用:对MongoDB的一次调用,对age对应的列表进行聚合运算。
map=function () { emit(this.age,{count:1}); } reduce= function (key,emits) { total=0; for(var i in emits) { total+=emits[i].count; } return {"age":key,count:total}; }
step2,执行MapReduce聚合运算
mr=db.runCommand( { "mapreduce":"foo", "map":map, "reduce":reduce, out:"Count Doc" })
step3,查看聚合运算的结果
db[mr.result].find()
示例3,研究reduce函数的特性
reduce函数具备累加的特性,经过屡次调用,可以产生最终的累加值,例如,如下reduce函数对于任意一个特定的key,reduce都能计算key的数量
reduce= function (key,emits) { total=0; for(var i in emits) { total+=emits[i].count; } return {"key":key,count:total}; }
调用示例:传递的Key是相同的,都是“x”,每一个emits都是一个数组,反复调用reduce函数,最终得到key的累加值。
r1=reduce("x",[{count:1},{count:2}]) r2=reduce("x",[{count:3},{count:5}]) r3=reduce("x",[r1,r2])
参考doc: