聚合
聚合用于处理多个文档数据,并输出计算结果。在mongodb中,聚合可以完成以下事情:
- 按照指定值对文档数据分组
- 对分组数据进行处理并返回单个结果
- 随着时间变化分析数据
在mongodb中,支持三种方式对聚合的实现,
- 聚合管道(Aggregation Pipeline)
- Map-Reduce
- 单目标聚合操作(Single Operation)
1. 聚合管道(Aggregation Pipeline)
一个管道中可以包含多个操作节点,每个阶段具有如下特点:
- 每个阶段为一个操作,每个操作是对输入文档文档进行处理。每个阶段中可以包含了filter, group等操作
- 当一个管道中包含了多个阶段时,上一个阶段的输出为下一个阶段的输入
- 一个聚合管道能够对分组数据返回一个单一的结果值。例如总数,平均值等
聚合操作并不会改变文档中的数据,除非聚合操作中包含了
$merge
或者$set
等阶段。
1.1 实例展示
下面就以官方的例子展示聚合的基础用法,这里以订单数据为例子,输入数据:
db.orders.insertMany( [ { _id: 0, productName: "Steel beam", status: "new", quantity: 10 }, { _id: 1, productName: "Steel beam", status: "urgent", quantity: 20 }, { _id: 2, productName: "Steel beam", status: "urgent", quantity: 30 }, { _id: 3, productName: "Iron rod", status: "new", quantity: 15 }, { _id: 4, productName: "Iron rod", status: "urgent", quantity: 50 }, { _id: 5, productName: "Iron rod", status: "urgent", quantity: 10 } ] )
我们以管道的方式实现按照产品名称分组,并计算每个产品的数量。
db.orders.aggregate( [ { $match: { status: "urgent" } }, { $group: { _id: "$productName", sumQuantity: { $sum: "$quantity" } } } ] )
在这个操作中包含了两个阶段,
$match
- 该阶段从文档数据中过滤状态为
urgent
的文档 - 并将符合条件的文档输出到
$group
阶段中
- 该阶段从文档数据中过滤状态为
$group
分组数据- 按照
productName
进行分组 - 将分组的数据
quantity
数据相加,并保存到sumQuantity
字段中
- 按照
则对应的输出结果为:
[ { _id: 'Steel beam', sumQuantity: 50 }, { _id: 'Iron rod', sumQuantity: 60 } ]
1.2 zipcode实例
zipcode展示了聚合的另一种使用方法,数据文件可以下载进行使用。城市数据
1.2.1 数据结构
在以上导入的数据中,每条数据都包含了以下的字段:
{ "_id": "10280", "city": "NEW YORK", "state": "NY", "pop": 5574, "loc": [ -74.016323, 40.710537 ] }
_id
字段记录了城市的编码city
字段记录了城市的名称state
记录了州的缩写pop
记录了这座城市的人口loc
记录了城市的坐标,记录了经度和维度
1.2.2 返回人口超过1千万的城市
则具体的查询方式为
db.zipcodes.aggregate( [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, { $match: { totalPop: { $gte: 10*1000*1000 } } } ] )
在以上的查询中包含了两个stage,
$group
- 首先按照state对文档数据进行分组
- 然后对分组文档数据中的pop字段使用
$sum
求和,并将求和数据保存在totalPop字段中 - 将生成的文档数据输出到
$match
阶段中
$match
- 该阶段主要对文档进行过滤,只返回总人口totalPop超过一千万的城市
在$group
阶段中生成的文档输入如下:
{ "_id" : "AK", "totalPop" : 550043 }
在以上的操作中,就相当于执行SQL
select state, sum(pop) as totalPop from zipcodes group by state having totalPop >= (10 * 1000 * 1000)
在执行上面的聚合操作后,则对应的输出结果为:
[ { _id: 'PA', totalPop: 11881643 }, { _id: 'IL', totalPop: 11427576 }, { _id: 'FL', totalPop: 12686644 }, { _id: 'OH', totalPop: 10846517 }, { _id: 'TX', totalPop: 16984601 }, { _id: 'NY', totalPop: 17990402 }, { _id: 'CA', totalPop: 29754890 } ]
1.2.3 返回城市的平均人口
这里用于统计州的平均人口数量,这里就需要统计每个州下的城市数据,以及城市的人口数据,则对应的聚合为:
db.zipcodes.aggregate( [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } }, { $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } } ] )
在以上操作中包含了两个阶段,都是与$group
来实现,
$group
- 第一个group操作根据state和city两个字段分组,、
- 然后根据分组情况统计人口的总数,通过
$sum
来实现人口的总数统计
-
此时我们看到的数据结构如下:
{ "_id" : { "state" : "CO", "city" : "EDGEWATER" }, "pop" : 13
- 第二个
$group
- 第二个group接收到第一个group的输入文档数据,再次按照state进行分组
- 并通过
$avg
的操作对pop
属性求平均值
在聚合管道中,当我们需要取文档中的字段值时,则根据
$
表达式进行获取
1.2.4 获取每个州人口最多和最少的城市已经人口数量
db.zipcodes.aggregate( [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } }, { $sort: { pop: 1 } }, { $group: { _id : "$_id.state", biggestCity: { $last: "$_id.city" }, biggestPop: { $last: "$pop" }, smallestCity: { $first: "$_id.city" }, smallestPop: { $first: "$pop" } } }, { $project: { _id: 0, state: "$_id", biggestCity: { name: "$biggestCity", pop: "$biggestPop" }, smallestCity: { name: "$smallestCity", pop: "$smallestPop" } } } ] )
在以上的操作总,包含了四个阶段的操作:
$group
- 按照state和city进行分组
- 并对分组内的人口pop进行求和
该步骤完成后,对应的数据结构为:
{ "_id" : { "state" : "CO", "city" : "EDGEWATER" }, "pop" : 13154 }
$sort
- 是对上一步的分组求和的结果进行排序
pop
指定了排序的规则- 1 – 正序
- -1 – 倒序
$group
- 该group中按照state字段进行分组
$last
是获取分组中的最后一个文档数据$first
获取文档中的第一个元素
该步骤在完成后,则对应的数据结构为
{ "_id" : "WA", "biggestCity" : "SEATTLE", "biggestPop" : 520096, "smallestCity" : "BENGE", "smallestPop" : 2 }
$project
该阶段是作为可选阶段,可以对返回的文档数据进行重写- 在这个阶段中,对返回的数据格式进行了改写。则这个阶段返回的结果数据为
此时则对应的返回结果为:
{ biggestCity: { name: 'PORTLAND', pop: 518543 }, smallestCity: { name: 'LYONS', pop: 0 }, state: 'OR' }
1.3 聚合管道限制
在聚合管道中,是存在一些限制的,主要包括一下两点:
1.3.1 结果大小限制
mongodb中对于单个文档的大小是有限制的,目前打个文档大小为16m
. 如果单个文档超过了BSON的16m
限制,聚合管道将会报错。但是这个限制只是针对返回的文档大小,在聚合管道执行的过程中,是完全可能超过这个限制的。在db.collection.aggregation()
方法中,默认返回的是一个cursor
对象。
1.3.2 内存大小限制
聚合管道也是有内存大小限制的,默认每隔管道使用的最大内存为100m
, 当超过这个限制的时候,聚合管道将会报错。当管道使用的内存超过内存限制时,可以通过allowDiskUse
参数将管道输入写入到磁盘临时文件中。
$search
操作不受100m内存的限制,因为该操作是以单独的进程执行
以下操作在指定allowDiskUse=true
的时候,会将数据写入到磁盘中:
$bucket
$bucketAuto
$group
$sort
只有排序操作不支持索引时$sortByCount
聚合管理以流式方式获取文档,处理文档,然后返回文档,但是有的阶段并不会马上返回文档,而是会等到所有文档都处理完成后一起输出,这时数据是存储在内存中,这样的话,内存中的文档大小是完全可能超过100m的。
如果一个$sort
的操作的文档大小超过100m的时候,建议与$limit
阶段一起使用。
1.3.4 聚合管道与集合分片
聚合管道支持在分片集合上进行操作,不过这种操作具有一定的前提。主要包含两种情况
- 如果聚合分片在
$match
操作上明确指定了shard key
信息,并且不包含$out
,$lookup
阶段,则整个聚合管道在指定的分片上执行 - 如果聚合管道在多个分片上执行,则需要在
mongos
上进行数据的合并,主要包含一下两种情况:- 如果聚合管道中包含了
$out
和$lookup
阶段操作,则合并操作必须在主分片(primary shard)上进行 - 如果聚合管道中包含了
$sort
和$group
阶段并且allowDiskUse=true
,此时数据合并将会在随机分片上进行。
- 如果聚合管道中包含了
在聚合管道在多分片上执行的时候,本身会设计到聚合管道的优化,会将管道换分为两部分,然后尽可能多的在多分片上并行执行管道中的阶段,已达到优化效果。
1.4 管道优化
在mongodb中,本身存在着对管道的优化,通过重塑管道,以提升管道的执行性能。为了能够看到mongodb对于管道的优化,我们可以在db.collection.aggregation()
方法中加入explain
参数,以查看mongodb对于管道的优化信息。
1.4.1 Projection优化
在聚合中,mongodb会分析使用的字段是否只是文档字段的一部分,当整个管道只是使用部分字段时,mongodb将不会获取文档的整个字段列表,而是根据需要获取字段,减少管道中的数据传输量。
1.4.2 管道顺序优化
1.4.2.1 ($project/$unset/$addFields/$set
) + $match
当在一个管道中,如果project/unset/addFields/set
后面跟了$match
的阶段,此时mongodb会将$match
中未参加计算的字段创建一个新的$match
阶段到projection
阶段的前面。
如果在管道中包含了多个projection
的阶段和$match
阶段,这时会将所有$match
中未参与计算的字段形成新的$match
, 并防止到所有的projection
前面。
例如有一下管道操作:
{ $addFields: { maxTime: { $max: "$times" }, minTime: { $min: "$times" } } }, { $project: { _id: 1, name: 1, times: 1, maxTime: 1, minTime: 1, avgTime: { $avg: ["$maxTime", "$minTime"] } } }, { $match: { name: "Joe Schmoe", maxTime: { $lt: 20 }, minTime: { $gt: 5 }, avgTime: { $gt: 7 } } }
在以上的查询中,mongodb将把$match
操作进行拆分,然后进可能的将新创建的$match
放到单独的过滤条件中,并穿插到不同的projection
语法前面。则可能最终的管道语句为:
{ $match: { name: "Joe Schmoe" } }, { $addFields: { maxTime: { $max: "$times" }, minTime: { $min: "$times" } } }, { $match: { maxTime: { $lt: 20 }, minTime: { $gt: 5 } } }, { $project: { _id: 1, name: 1, times: 1, maxTime: 1, minTime: 1, avgTime: { $avg: ["$maxTime", "$minTime"] } } }, { $match: { avgTime: { $gt: 7 } } }
- 在
{ avgTime: { $gt: 7 } }
的$match
中,字段avtTime
依赖的是$project
操作的返回的平均时间,因此该$match
操作无法移动,只能放到最后面位置 maxTime
和minTime
依赖了$addFields
操作产生的新字段,但是并不依赖于$project
操作,因此mongodb生成新的$match
操作,并放到了$project
操作的前面。$match
过滤条件{ name: "Joe Schmoe" }
并不依赖于$addFields
和$project
的任何操作,因此该过滤条件放到了所有阶段的前面。
在以上操作中,过滤条件
{ name: "Joe Schmoe" }
放到管道的最前面的好处就在于,在过滤数据的时候,我们能够使用索引过滤文档数据,而不是扫描全部的文档数据。(这建立在name字段创建了索引)
1.4.2.2 $match
+ $sort
$sort
和$match
的配合使用,主要优化点在于优先过滤数据,然后再执行排序。
例如定义管道如下:
{ $sort: { age : -1 } }, { $match: { status: 'A' } }
则在执行的时候,则对应的管道为:
{ $match: { status: 'A' } }, { $sort: { age : -1 } }
这样的顺序交换主要减少查询文档的数量,减少排序的文档数量,对结果并没有任何影响。
1.4.2.3 $redact
+ $match
当聚合管道中$redact
后紧跟$match
操作,mongodb可能会将$match
中的一部分创建新的$match
阶段放置到$redact
前面,如果新增$match
是在管道的开始,就可以使用索引过滤文档,以减少进入到管道中的文档数据。
例如以下管道定义:
{ $redact: { $cond: { if: { $eq: [ "$level", 5 ] }, then: "$$PRUNE", else: "$$DESCEND" } } }, { $match: { year: 2014, category: { $ne: "Z" } } }
则优化器在优化完成后,则管道变更为:
{ $match: { year: 2014 } }, { $redact: { $cond: { if: { $eq: [ "$level", 5 ] }, then: "$$PRUNE", else: "$$DESCEND" } } }, { $match: { year: 2014, category: { $ne: "Z" } } }
1.4.2.4 $project/$unset
+ $skip
从mongodb 3.2版本开始,当$project
或者$unset
后紧跟$skip
阶段时,此时$kip
将会被移动到$project/$unset
阶段前面。
例如有以下管道定义:
{ $sort: { age : -1 } }, { $project: { status: 1, name: 1 } }, { $skip: 5 }
则优化后的管道定义为:
{ $sort: { age : -1 } }, { $skip: 5 }, { $project: { status: 1, name: 1 } }
1.4.3 管道合并优化(Coalescence)
在聚合管道中,可能会将部分的阶段合并到上一个阶段
1.4.3.1 $sort
+$limit
当管道中包含了$sort
和$limit
操作时,此时可能会将$limit
合并到$sort
操作中,但是这个合并是有前提条件的:
- 只有在
$sort
和$limit
操作中没有包含其他可能改变文档数量的操作时(例如$unwind/$group
),才能进行合并操作
例如有以下管道定义:
{ $sort : { age : -1 } }, { $project : { age : 1, status : 1, name : 1 } }, { $limit: 5 }
则优化之后的管道变为:
{ "$sort" : { "sortKey" : { "age" : -1 }, "limit" : NumberLong(5) } }, { "$project" : { "age" : 1, "status" : 1, "name" : 1 } }
当
$sort
和$limit
操作中包含了$skip
阶段时,将$limit
合并到$sort
阶段时,需要加上$skip
的数值。
这样的合并能够减少通过管道的文档数据和保存在内存中的文档数量,这就相当于从1千万个数据中返回最小的五个数,此时内存中只需要维护5个数字即可
1.4.3.2 $limit
+ $limit
当$limit
阶段后面紧跟$limit
操作时,此时两个阶段可以合并,并且取两个$limit
操作中的最小值。
例如有以下管道定义:
{ $limit: 100 }, { $limit: 10 }
则优化之后的管道为:
{ $limit: 10 }
1.4.3.3 $skip
+ $skip
当$skip
后面紧跟$skip
阶段时,两个阶段合并为一个$skip
阶段,并且取两个$skip
数值的和。
例如有一下管道定义:
{ $skip: 5 }, { $skip: 2 }
则优化后的管道为:
{ $skip: 7 }
1.4.3.4 $match
+ $match
当$match
阶段后紧跟$match
操作时,可以合并两个阶段为一个$match
并通过$and
合并两个过滤条件。
例如有一下管道定义:
{ $match: { year: 2014 } }, { $match: { status: "A" } }
则优化后的管道定义为:
{ $match: { $and: [ { "year" : 2014 }, { "status" : "A" } ] } }
1.4.3.5 $lookup
+ $unwind
当$lookup
紧跟$unwind
阶段时,并且$unwind
使用了$lookup
中的as
字段信息,mongodb将合并两个阶段,防止创建大量中间文档数据。
例如有如下管道定义:
{ $lookup: { from: "otherCollection", as: "resultingArray", localField: "x", foreignField: "y" } }, { $unwind: "$resultingArray"}
则优化之后的管道定义为:
{ $lookup: { from: "otherCollection", as: "resultingArray", localField: "x", foreignField: "y", unwinding: { preserveNullAndEmptyArrays: false } } }
1.4.4 索引
在管道中使用索引能够大大的优化管道的性能,因为当查询使用索引的时候,可以大大减少管道处理的文档数量,也能够通过索引返回查询需要的文档。
例如,假如一个管道包含了$match
、$sort
、$group
阶段时,能够从索引获取以下好处:
$match
数据能够快速查询关联的文档数据$sort
索引能够在该阶段返回有序的文档数据- 在具有索引字段上执行
$group
操作时,能够使用$sort
排序快速的执行分组,并且返回所需要的字段值。
在管道中有多个阶段都可以从索引上获取性能上的提升:
$match
当该阶段处于管道开始位置时,能够通过索引快速过滤文档数据$sort
能够从索引中获取排序性能上的提升,但是文档数据不能被$project/$unwind/$group
操作处理- 如果能够满足一下条件,
$group
能够通过索引快速获取到每个分组的第一个元素:- 在
$group
之前,分组文档被$sort
排序过 - 在
$group
字段上包含索引,并且排序与索引字段顺序保持一致时 - 在
$group
中只有$first
一个归集操作
- 在
$geonear
该阶段始终能够使用索引,该阶段必须为管道第一个阶段并且包含了geospatial索引
2. Map-Reduce
聚合管道作为可选择的实现,比Map-Reduce有着更好的性能和使用性。
在Map-Reduce中能够使用聚合管道操作实现。例如
$group
,$merge
在使用Map-Reduce的时候,需要自定义实现处理数据函数,从4.4版本开始,可以通过
$accumulator
和$function
操作定义函数,通过这些操作能使使用js的函数实现自定义的功能。
Map-Reduce是一种数据处理范式,将大量的数据处理为有用的数据结果。为了执行Map-Reduce, mongdb提供了大量的数据库操作命令。
在Map-Reduce操作中,具体包含了一下步骤:
- mongodb根据数据过滤条件获取输入文档
map
操作用于对输入文档进行处理,并输出key-value
数据对- 输出的
key-value
数据对中, 一个key可能会包含多个值,此时可以通过reduce
对数据进行搜集和归档,并输出归集数据 - mongodb在拿到了聚合数据之后,可以将数据存储在一个集合中。或者可以通过
finalize
函数跟进一步的聚合和处理聚合数据结果。
Map-Reduce
中都是javascript的函数并且运行在mongod的进程中,Map-Reduce
以单个集合中的文档作为输入,在执行map
方法之前,可以使用任意的排序和limit的操作,Map-Reduce
能够返回文档作为数据结果,也可以将文档数据写入到集合中。
2.1 Map-Reduce
与集合分片
2.1.2 文档输入
在集合分片上执行Map-Reduce
, mongodb会自动分发Map-Reduce
任务到每个分片上并行的执行,然后将集合分片上的结果汇集到一起。这个过程不需要单独的参数设置,Map-Reduce
会自动的等待所有的分片任务执行完成。
2.1.3 文档输出
只要在mapReduce中的输出的文档值包含了分片的值,mongodb将使用_id
字段作为分片key来使用。
为了能向分片集合输出,有以下情况:
- 如何分片集合不存在,则先创建该集合。从4.2版本开始,map-reduce废弃了创建分片集合的选项设置,转而使用
sharded
选项。
如果集合不存在,则默认会创建集合,并且使用
_id
字段作为分片的key, 但是还是建议提前创建分片集合。 - 从4.2版本开始,废弃了替换已经存在的分片集合的选项。
- 从4.0版本开始,如果集合已经存在,但是没有设置分片,则map-reduce会失败
- 如果分片集合是新创建或者为空集时,map-reduce使用第一阶段产生的文档填充分片集合的初始化块
mongos
分发map-reduce的任务到集合分片节点上,每个分片节点将执行结果回传到其他的分片节点,并执行reduce/finilize
阶段,并将结果输出到指定的集合中。
2.2 Map-Reduce并发控制
Map-Reduce中包含了许多的任务,包括:
- 从集合中读取数据
- 执行map函数
- 执行reduce函数
- 在执行过程中将数据写入到临时集合中
- 将数据写出到目标集合中
在执行的过程中,主要持有了以下锁:
- 在读阶段会有个读锁,每个读锁锁定100个文档
- 将数据写入到临时集合时,对于单个写操作获取写锁
- 如果输出集合不存在,则创建集合的时候获取写锁
- 如果输出集合存在,则写出操作(例如
$merge
,$replace
,$reduce
)获取写锁,这个锁是全局性的,会阻塞在mongod
实例上的所有写操作。
2.3 Map-Reduce实例
在mongo shell中,db.collection.mapReduce()
是对mapReduce
操作的封装,在一下的操作实例中,都是对db.collection.mapReduce()
来实现的。
实例数据源准备如下:
db.orders.insertMany([ { _id: 1, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-01"), price: 25, items: [ { sku: "oranges", qty: 5, price: 2.5 }, { sku: "apples", qty: 5, price: 2.5 } ], status: "A" }, { _id: 2, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-08"), price: 70, items: [ { sku: "oranges", qty: 8, price: 2.5 }, { sku: "chocolates", qty: 5, price: 10 } ], status: "A" }, { _id: 3, cust_id: "Busby Bee", ord_date: new Date("2020-03-08"), price: 50, items: [ { sku: "oranges", qty: 10, price: 2.5 }, { sku: "pears", qty: 10, price: 2.5 } ], status: "A" }, { _id: 4, cust_id: "Busby Bee", ord_date: new Date("2020-03-18"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }, { _id: 5, cust_id: "Busby Bee", ord_date: new Date("2020-03-19"), price: 50, items: [ { sku: "chocolates", qty: 5, price: 10 } ], status: "A"}, { _id: 6, cust_id: "Cam Elot", ord_date: new Date("2020-03-19"), price: 35, items: [ { sku: "carrots", qty: 10, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" }, { _id: 7, cust_id: "Cam Elot", ord_date: new Date("2020-03-20"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }, { _id: 8, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 75, items: [ { sku: "chocolates", qty: 5, price: 10 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" }, { _id: 9, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 55, items: [ { sku: "carrots", qty: 5, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 }, { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }, { _id: 10, cust_id: "Don Quis", ord_date: new Date("2020-03-23"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" } ])
2.3.1 返回每个用户的订单总额
在该实例中,主要步骤如下:
- 先按照cust_id进行分组
- 然后再将分组的用户下所有订单的价格相加求和
- 定义map处理函数,用于处理输入的文档
- 在这个函数中,
this
指代了map-reduce正在处理的文档 - 函数中映射了cust_id和price的字段,并将映射字段发出到下一个阶段
var mapFunction1 = function() { emit(this.cust_id, this.price); };
- 在这个函数中,
- 定义一个reduce函数,包含了两个入参
keyCustId
和valuesPrice
- 其中
valuesPrice
是从map函数中发送的数据,其中是按照cust_id进行分组后的值列表 - 这个函数主要计算了
valuesPrice
的和var reduceFunction1 = function(keyCustId, valuesPrices) { return Array.sum(valuesPrices); };
- 其中
- 定义map-reduce,并使用上面定义的两个函数
db.orders.mapReduce( mapFunction1, reduceFunction1, { out: "map_reduce_example" } )
在这个操作中,是将结果写入到了map_reduce_example
集合中,如果集合存在,会用map-reduce产生的结果替换掉集合中的内容。
- 查询map-reduce产生的结果
db.map_reduce_example.find().sort( { _id: 1 } )
则完整的查询语句为:
db.orders.mapReduce( function() { emit(this.cust_id, this.price); }, function(custIdKey, pricesValue) { return Array.sum(pricesValue); }, {out: "map_reduce_example"} );
这个操作我在mongosh中没有操作成功,提示:
MongoshUnimplementedError: [ASYNC-10003] Unable to handle 'this' keyword outside of method definition
, 需要通过三方的mongodb的工具来操作
则对应的返回结果为:
{ "_id" : "Ant O. Knee", "value" : 95 } { "_id" : "Busby Bee", "value" : 125 } { "_id" : "Cam Elot", "value" : 60 } { "_id" : "Don Quis", "value" : 155 }
这里的操作可以通过db.runCommand()
方法来替换,则对应的查询为:
db.runCommand({ mapReduce: "orders", map: function() { emit(this.cust_id, this.price); }, reduce: function(custIdKey, pricesValue) { return Array.sum(pricesValue); }, out: "map_reduce_example2" });
2.3.2 计算没人购买商品的平均数量
在这个实例中,需要查询出订单时间大于2020-03-01
的所有订单,并通过map-reduce
结算订单的平均数量。
在这个map-reduce中,主要涉及到操作步骤如下:
- 先按照
item.sku
进行分组,并计算每个sku
在订单中购买的数量总和 - 计算每个sku的平均购买数量,并将结果合并到集合中
在向输出集合中插入数据的时候,有以下两种情况:
- 如果新结果的key已经存在,则用新结果的key替换已经存在文档
- 如果key不存在,则向文档中新插入一条数据。
- 定义map函数
- 在函数中,this指代了map-reduce正在处理的文档
- 对于订单中的每个item, 都会生成一个新的value对象,对象中包含了
qty
和count
字段var mapFunction2 = function() { for (var idx = 0; idx < this.items.length; idx++) { var key = this.items[idx].sku; var value = { count: 1, qty: this.items[idx].qty }; emit(key, value); } };
- 定义reduce函数,用于map输出的新的文档数据,在该函数中包含了
keySKU
和countObjVals
参数:countObjVals
为一个数组,是按照keySKU
分组之后得到的分组结果集- reduce的函数将
countObjVals
转换为一个对象reducedObj
,其中包括了count
和qty
字段信息 - 在
reducedObj
中,count
记录了商品keySKU
的数量,qty
记录了在订单中的SKU的总数var reduceFunction2 = function(keySKU, countObjVals) { reducedVal = { count: 0, qty: 0 }; for (var idx = 0; idx < countObjVals.length; idx++) { reducedVal.count += countObjVals[idx].count; reducedVal.qty += countObjVals[idx].qty; } return reducedVal; };
- 定义finalize函数,该函数包含了两个参数
key
和reducedVal
,其中该函数用户处理对平均值的计算,并将结果放到reducedVal.avg
字段中。对应的函数定义如下:var finalizeFunction2 = function (key, reducedVal) { reducedVal.avg = reducedVal.qty/reducedVal.count; return reducedVal; };
- 通过定义完成以上的函数后,就可以通过
db.Collection.mapReduce()
定义Map-Reduce
,然后将结果写出到map_reduce_example2
集合中。则对应定义如下:db.orders.mapReduce( mapFunction2, reduceFunction2, { out: { merge: "map_reduce_example2" }, query: { ord_date: { $gte: new Date("2020-03-01") } }, finalize: finalizeFunction2 } );
在该定义中,我们使用query
用以过滤订单时间在2020-03-01
之后的文档,然后将结果数据写入到map_reduce_example2
集合中。
在写入结合的时候,有以下需要注意的点:
- 如果集合不存在,则创建集合
- 如果新的结果数据在集合中存在了相同的key, 则新的结果会覆盖已经存在的结果文档
- 如果新的结果数据key不存在,则直接向集合中插入文档
- 查询结果集
db.map_reduce_example2.find().sort( { _id: 1 } )
通过以上的操作,则完整的操作语句为:
db.orders.mapReduce( function() { for (var idx = 0; idx < this.items.length;idx++) { var key = this.items[idx].sku; var value = {count: 1, qty: this.items[idx].qty} emit(key, value) } }, function(keySKU, countObjVals) { reducedVal = {count: 0, qty: 0} for (var idx = 0; idx < countObjVals.length; idx++) { reducedVal.cout += countObjVals[idx].count; reducedVal.qty += countObjVals[idx].qty; } return reducedVal; }, { out: { merge: "map_reduce_example2" }, query: { ord_date: { $gte: new Date("2020-03-01") } }, finalize: function(key, reducedVal) { reducedVal.avg = reducedVal.qty / reducedVal.count; return reducedVal; } } );
则对应的返回结果为
[ { _id: 'carrots', value: { count: 2, qty: 15, avg: 7.5 } }, { _id: 'chocolates', value: { count: 3, qty: 15, avg: 5 } }, { _id: 'oranges', value: { count: 7, qty: 63, avg: 9 } }, { _id: 'apples', value: { count: 4, qty: 35, avg: 8.75 } }, { _id: 'pears', value: { count: 1, qty: 10, avg: 10 } } ]
在以上的map-reduce中,如果我们使用聚合管道,则可以避免使用自定义函数的方式实现,则具体的聚合管道定义如下:
db.orders.aggregate( [ { $match: { ord_date: { $gte: new Date("2020-03-01") } } }, { $unwind: "$items" }, { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } }, { $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } }, { $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
$match
过滤订单时间在2020-03-01
之后的文档$unwind
操作将订单下的items
进行展开,这样只需要处理单独的item数据即可$group
按照item.sku
进行分组,并且结算每个分组的结果值:qty
计算每个分组下item
的qty的和orders_ids
则记录了对应的订单列表,$addToSet
是使用的Set
集合,具有去重功能
$project
则是重新生成新的文档结构,主要包含了value
和_id
两个字段- count: 通过
$size
获取orders_ids
的长度 - qty: 则是订单items的数量之和
avg
:则是通过$divide
求qty/(orders_ids.size)
的平均值
- count: 通过
$merge
操作将最终的结果写入到agg_alternative_3
集合中,whenMatched
: 表示在有相同的key的时候,则使用replace替换久的文档whenNotMatched
:表示在没有匹配到相同的key是,则执行Insert操作
2.4 增量数据处理
在上面的例子中,主要演示了对于静态数据的处理逻辑,map-reduce是对整个集合中的数据进行处理。但现实中往往是数据在持续的增长,希望能够对增量数据的处理,而不是每次都是对集合中的全量数据进行处理。
为了能够处理增量的数据,在map-reduce中需要做一些特殊的处理:
- 在当前的集合中执行map-reduce任务,但是将结果写出到一个单独的集合
- 当有更多的任务需要执行的时候,则在map-reduce任务中加入一下条件:
- 通过
query
条件过滤出新增文档 - 在
out
阶段中将新的结果合并到已经存在的结果文档中
- 通过
2.4.1 map-reduce定义
在下面的实例中,还是使用官方的实例实现:
首先做数据准备,userSessions
用于存储用户的session信息:
db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
- 首先定义map函数,map函数主要对数据进行分组,并输出结果:
var mapFunction = function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); };
该方法中首先按照userid
进行数据分组,然后设置value
对象,对象中包含了total_time
, count
, avt_time
字段
- 然后定义reduce函数,用于处理map中的输出文档,该函数主要包含了两个参数,key代表了user_id, values则是分组后的数据
var reduceFunction = function(key, values) { var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; };
reduce函数中对用户分组的数据进行聚合,计算用户登陆的总时间total_time
和登陆次数count
, 并将结果写入到reducedObject
中。
- 然后定义finalize函数,该函数也包含了两个入参信息,key也表示了user_id信息,reducedValue则是reduce函数的产出结果。
var finalizeFunction = function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; };
finalize函数对最终结果进行了处理,计算出了用户平均登陆时间, 即total_time/count
- 通过以上函数定义,则定义map-reduce的处理任务,则处理任务为:
db.usersessions.mapReduce( mapFunction, reduceFunction, { out: "session_stats", finalize: finalizeFunction } )
在数据处理完成之后,就能够将最终的结果写出到session_stats
集合中,通过查询该集合就能获取到map-reduce的结果。
- 查询最终结果
db.session_stats.find().sort( { _id: 1 } )
将以上的每个步骤的操作合并到一起,最终的map-reduce定义为
db.usersessions.mapReduce(function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); }, function(key, values){ var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; }, {out: "session_stats", finalize: function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; }});
2.4.2 增量数据处理
其后,usersessions中的数据发生了增长,此时map-reduce任务要能够重新跑起来,并且能够获取到正确的结果,此时我们只需要针对增量的数据进行处理:
增量数据如下:
db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ])
则只需要通过query
过滤出新增的数据,然后再次将map-reduce任务跑一次,依然将结果输入到session_stats
集合中即可,则对应的操作为:
db.usersessions.mapReduce( mapFunction, reduceFunction, { query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } }, out: { reduce: "session_stats" }, finalize: finalizeFunction } );
则完整的操作为
db.usersessions.mapReduce(function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); }, function(key, values){ var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; }, {out: {reduce:"session_stats"}, query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } }, finalize: function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; }});
在上面的操作中主要有一下几个点:
- out输出时,并不是直接已还已经存在的数据,而是使用了
reduce
操作,对已有数据做累加的操作 - query增加了过滤的条件,只是过滤出了新增的文档数据部分。
2.4.3 聚合管道实现
在以上操作中,都可以使用聚合管道进行实现,重置下数据源信息:
db.usersessions.drop(); db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
则使用聚合管道实现方式如下
db.usersessions.aggregate([ { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ])
- $group“操作则是按照userid字段进行分组,在分组中可以执行其他操作:
$sum
计算登陆时长的总和,并将统计和的值映射到total_time字段$avg
操作则是对分组内的所有数据登陆时间求平均值,并将结果放入到avg_time
字段中
$project
则是对文档数据进行重构,则只包含了_id
和value
字段$merge
则是对输出的最终结果进行合并,into
将结果文档写入到session_stats_agg
集合中,当集合不存在时,则创建whenMatched
则是对结果进行合并,将新增和旧值进行相加whenNotMatched
则直接插入文档
当有新输入插入时,则只需要配合$match
操作实现对新数据过滤即可,新增数据如下:
db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ])
则在聚合管道中新增$match
操作,过滤出新增的数据:
db.usersessions.aggregate([ { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ])
在这里为了避免每次都需要修改对应的过滤事件,可以将对应的操作定义成为一个函数,事件通过参数的方式传入:
updateSessionStats = function(startDate) { db.usersessions.aggregate([ { $match: { ts: { $gte: startDate } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]); };
则过滤对应的数据的方式可以变更为:
updateSessionStats(ISODate('2020-03-05 00:00:00'))