2014年2月5日 星期三

[MongoDB] 透過 MapReduce 進行 Collections Join @ Ubuntu 12.04

使用 NOSQL 時,其先天設計無法提供像 MySQL Table Joins 等招數,此時的解法就是把兩個 tables 倒進一張 table 來解決。

以 MongoDB command line 操作,新增資料:

> use mydb
switched to db mydb
> db.t1.insert({"data":1, "data2":2, "data3":3})
> db.t1.insert({"data":4, "data2":5, "data3":6})
> db.t1.insert({"data":7, "data2":8, "data3":9})
> db.t1.find()
{ "_id" : ObjectId("52f1f9c7bf6317bb2615b216"), "data" : 1, "data2" : 2, "data3" : 3 }
{ "_id" : ObjectId("52f1f9cbbf6317bb2615b217"), "data" : 4, "data2" : 5, "data3" : 6 }
{ "_id" : ObjectId("52f1f9cebf6317bb2615b218"), "data" : 7, "data2" : 8, "data3" : 9 }
> db.t2.insert({"data4":3, "data5":2, "data6":1})
> db.t2.insert({"data4":6, "data5":5, "data6":4})
> db.t2.insert({"data4":9, "data5":8, "data6":7})
> db.t2.find()
{ "_id" : ObjectId("52f1f9d4bf6317bb2615b219"), "data4" : 3, "data5" : 2, "data6" : 1 }
{ "_id" : ObjectId("52f1f9d7bf6317bb2615b21a"), "data4" : 6, "data5" : 5, "data6" : 4 }
{ "_id" : ObjectId("52f1f9dbbf6317bb2615b21b"), "data4" : 9, "data5" : 8, "data6" : 7 }


定義 t1 跟 t2 的 mapper function:

> t1_map = function() {
emit(this.data3, { "data": this.data, "data2": this.data2, "data3": this.data3 });
}
> t2_map = function() {
emit(this.data4, { "data4": this.data4, "data5": this.data5, "data6": this.data6 });
}


定義 reducer function:

> reducer = function(key, values){
var result = {} ;
values.forEach( function(value){
var field;
for(field in value)
if( value.hasOwnProperty(field) )
result[field] = value[field];
});
return result;
}


進行 map-reduce:

> db.tmp.drop()
true
> db.t1.mapReduce(t1_map, reducer , {"out":{"reduce":"tmp"}} )
{
        "result" : "tmp",
        "timeMillis" : 10,
        "counts" : {
                "input" : 3,
                "emit" : 3,
                "reduce" : 0,
                "output" : 3
        },
        "ok" : 1,
}
> db.tmp.find()
{ "_id" : 3, "value" : { "data" : 1, "data2" : 2, "data3" : 3 } }
{ "_id" : 6, "value" : { "data" : 4, "data2" : 5, "data3" : 6 } }
{ "_id" : 9, "value" : { "data" : 7, "data2" : 8, "data3" : 9 } }
> db.t2.mapReduce(t2_map, reducer , {"out":{"reduce":"tmp"}} )
{
        "result" : "tmp",
        "timeMillis" : 6,
        "counts" : {
                "input" : 3,
                "emit" : 3,
                "reduce" : 0,
                "output" : 3
        },
        "ok" : 1,
}
> db.tmp.find()
{ "_id" : 3, "value" : { "data4" : 3, "data5" : 2, "data6" : 1, "data" : 1, "data2" : 2, "data3" : 3 } }
{ "_id" : 6, "value" : { "data4" : 6, "data5" : 5, "data6" : 4, "data" : 4, "data2" : 5, "data3" : 6 } }
{ "_id" : 9, "value" : { "data4" : 9, "data5" : 8, "data6" : 7, "data" : 7, "data2" : 8, "data3" : 9 } }


以上的概念就是 t1 採用 data3 作為 join key,而 t2 採用 data4 ,分別跑 map-reduce 後儲存在 tmp 這張表(collection),如此 tmp 即為常用的 SQL Join 結果。

以上須留意的是跑 map-reduce 時,要指定 out 為 reduce 形態,細節可以在 out Options 查看,可分成 replace, reduce, merge ,若沒指定為 reduce 的話,上述最後結果僅有 t2 的資料。

此外,在 pymongo 上也能達成,有興趣可以參考 map-reduce-join.py,寫的彈性一點,所以閱讀性較差,用法:

$ python map-reduce-join.py --host localhost --database mydb --reset-result --result tmp --show-result --select-out-1 data data2 data3 --join-key-1 data3 --select-out-2 data4 data5 data6 --join-key-2 data4

結果:

Mapper 1 Code:
                function() {
                        emit(this.data3, {"data": this.data, "data2": this.data2, "data3": this.data3});
                }

Reducer 1 Code:
                function(key, values) {
                        var out = {};
                        values.forEach( function(value) {
                                for ( field in value )
                                        if( value.hasOwnProperty(field) )
                                                out[field] = value[field];

                        });
                        return out
                }

{u'counts': {u'input': 3, u'reduce': 0, u'emit': 3, u'output': 3}, u'timeMillis': 34, u'ok': 1.0, u'result': u'tmp'}
{u'_id': 3.0, u'value': {u'data': 1.0, u'data3': 3.0, u'data2': 2.0}}
{u'_id': 6.0, u'value': {u'data': 4.0, u'data3': 6.0, u'data2': 5.0}}
{u'_id': 9.0, u'value': {u'data': 7.0, u'data3': 9.0, u'data2': 8.0}}
Mapper 2 Code:
                function() {
                        emit(this.data4, {"data4": this.data4, "data5": this.data5, "data6": this.data6});
                }

Reducer 2 Code:
                function(key, values) {
                        var out = {};
                        values.forEach( function(value) {
                                for ( field in value )
                                        if( value.hasOwnProperty(field) )
                                                out[field] = value[field];
                        });
                        return out
                }

{u'counts': {u'input': 3, u'reduce': 0, u'emit': 3, u'output': 3}, u'timeMillis': 5, u'ok': 1.0, u'result': u'tmp'}
{u'_id': 3.0, u'value': {u'data5': 2.0, u'data4': 3.0, u'data6': 1.0, u'data': 1.0, u'data3': 3.0, u'data2': 2.0}}
{u'_id': 6.0, u'value': {u'data5': 5.0, u'data4': 6.0, u'data6': 4.0, u'data': 4.0, u'data3': 6.0, u'data2': 5.0}}
{u'_id': 9.0, u'value': {u'data5': 8.0, u'data4': 9.0, u'data6': 7.0, u'data': 7.0, u'data3': 9.0, u'data2': 8.0}}

沒有留言:

張貼留言