kagxin / blog

个人博客:技术、随笔、生活
https://github.com/kagxin/blog/issues
7 stars 0 forks source link

mongodb mapReduce #51

Open kagxin opened 4 years ago

kagxin commented 4 years ago

mongodb mapReduce使用

测试数据

import datetime
from mongo import m

data = [
    {
        "cust_id": "abc123",
        "ord_date": datetime.datetime.now(),
        "status": 'A',
        "price": 25,
        "items": [{"sku": "mmm", "qty": 5, "price": 2.5},
                  {"sku": "nnn", "qty": 5, "price": 2.5}]
    },
    {
        "cust_id": "abc124",
        "ord_date": datetime.datetime.now() - datetime.timedelta(days=1),
        "status": 'A',
        "price": 20,
        "items": [{"sku": "mmm", "qty": 5, "price": 2},
                  {"sku": "nnn", "qty": 5, "price": 2}]
    },
    {
        "cust_id": "abc124",
        "ord_date": datetime.datetime.now(),
        "status": 'A',
        "price": 25,
        "items": [{"sku": "mmm", "qty": 5, "price": 2.5},
                  {"sku": "nnn", "qty": 5, "price": 2.5}]
    },
    {
        "cust_id": "abc123",
        "ord_date": datetime.datetime.now() - datetime.timedelta(days=1),
        "status": 'A',
        "price": 20,
        "items": [{"sku": "mmm", "qty": 5, "price": 2},
                  {"sku": "nnn", "qty": 5, "price": 2}]
    }
]

m.get_collection('orders').insert_many(data)

求每一个cust_id的,price总和

from mongo import m

map_func = """
function() {
    emit(this.cust_id, this.price);
}
"""

reduce_func = """
function(keyCustId, valuesPrices) {
    return Array.sum(valuesPrices);
}   
"""

res = m.get_collection('orders').map_reduce(map_func, reduce_func, "map_reduce_example")
print(res)

结果

/* 1 */
{
    "_id" : "abc123",
    "value" : 65.0
}

/* 2 */
{
    "_id" : "abc124",
    "value" : 45.0
}

求每一个stu的,qty的总和,price的均值

from mongo import m

query = {

}
map_func2 = """
function() {
    for (var idx = 0; idx < this.items.length; idx++) {
        var key = this.items[idx].sku;
        var value = {
            count: 1,
            qty: this.items[idx].qty
        };
        print(key, value);
        emit(key, value);
   }
};
"""
reduce_func2 = """
function(keySKU, countObjVals) {
    reducedVal = { count: 0, qty: 0 };

    for (var idx = 0; idx < countObjVals.length; idx++) {
     reducedVal.count += countObjVals[idx].count;
     reducedVal.qty += countObjVals[idx].qty;
    }

    return reducedVal;
};
"""
finalize_func = """
function (key, reducedVal) {

    reducedVal.avg = reducedVal.qty/reducedVal.count;

    return reducedVal;
};
"""

res = m.get_collection('orders').map_reduce(map_func2, reduce_func2, 'map_reduce_example3', full_response=True, finalize=finalize_func, query=query)
print(res)

结果

/* 1 */
{
    "_id" : "mmm",
    "value" : {
        "count" : 5.0,
        "qty" : 25.0,
        "avg" : 5.0
    }
}

/* 2 */
{
    "_id" : "nnn",
    "value" : {
        "count" : 5.0,
        "qty" : 25.0,
        "avg" : 5.0
    }
}

ref: https://docs.mongodb.com/manual/core/map-reduce/ https://docs.mongodb.com/manual/reference/command/mapReduce/