I'm really stuck at a point, where I have to enforce the mapReduce framework to use only one reducer
for a specific key. Also I want to affect how the framework sorts keys. I'll introduce the problem on an example:
I would like to emit key value pairs in the following form:
< b x b > : < d1 >
< b x > : < d2 >
< b > : < d3 >
< b a x > : < d2, d3 >
figure 1
The key is a sequence - as you see -, each starts with an item b, which would be a data type string
. Values would be ObjectID
s as noted with letter d and a number. There are other key value pairs I emit from a map
function, which starts with a different item in their keys, for example a or x:
< a b x > : < d1 >
< a x > : < d3 >
< x a a > : < d3 >
figure 2
I need to force the framework to call a single reduce
function for each key-value pairs, which starts with a specific item. Also I have to force a sort between map
and reduce
to sort keys in reverse lexicographic order. So a single reducer would receive the following key-value pairs for item b:
< b x b > : < d1 >
< b x > : < d2 >
< b a x > : < d2, d3 >
< b > : < d3 >
figure 3
What I've tried:
I've tried to emit key-value pairs in the following form:
b : < (d1 : < b x b >) >
b : < (d2 : < b x >) >
b : < (d3 : < b >) >
b : < (d2 : < b a x >), (d3 : < b a x >) >
figure 4
This way a single reducer received values for the item b, but as you see not in a reverse lexicographic order, and the worst part is, that there is no guarantee that a single reducer would get all values for a specific key (as MongoDB's MapReduce documentation states).
Basically: I have to process these sequences that starts with a specific item in reverse lexicographic order.
I have no ideas that would lead me further to the solution. How can I enforce single reducers for keys and affect sorting? How should I design the data structure passed (emitted) to fulfill my needs?
These functions would be similar to Hadoop's Comparator
and Partitioner
.
UPDATE ------------------------------------------------------------------------------------------------------------------------
It has been pointed out to me by Asya Kamsky, that finalize
runs only once per key, so it solves the partitioning problem, when every value must seen by a single reducer for a specific key.
The sorting is still a problem. For large datasets, implementing my own sort inside finalize
would mean a huge bottleneck in terms of execution time, while I'm not utilizing the natural sort mechanism between map
and reduce
. The keys are of data type string
, but it would be easy to encode them as negative integers
to force reverse sorting.
Let's check figure 3 again:
< b x b > : < d1 >
< b x > : < d2 >
< b a x > : < d2, d3 >
< b > : < d3 >
figure 3
This is, what a finalize
would have to receive for key b. The keys, for example < b x b >
are composite here. Finalize would need to receive keys that start with b, but for other parts of the key, in reverse lexicographic order.
Is there any way to achieve this and avoid the sort inside finalize
?
What you can do is emit documents "normally" and use reduce to combine all emitted values into a sorted array. Then use finalize
method to do whatever processing you were going to do in a single reducer.
MongoDB reduce function could be called more than once, but it also could be called never (in case where only a single value is emitted for a particular key). Using finalize
would solve both problems as it's called exactly once per key.
Sample data:
> db.sorts.find()
{ "_id" : 1, "b" : 1, "a" : 20 }
{ "_id" : 2, "b" : 1, "a" : 2 }
{ "_id" : 3, "b" : 2, "a" : 12 }
{ "_id" : 4, "b" : 3, "a" : 1 }
{ "_id" : 5, "b" : 2, "a" : 1 }
{ "_id" : 6, "b" : 3, "a" : 11 }
{ "_id" : 7, "b" : 3, "a" : 5 }
{ "_id" : 8, "b" : 2, "a" : 1 }
{ "_id" : 9, "b" : 1, "a" : 15 }
Map function:
map = function() {
emit( this.b, { val: [ this.a ] } );
}
Reduce function which adds new incoming val into a sorted array by walking the array:
reduce = function( key, values) {
var result = { val: [ ] };
values.forEach(function(v) {
var newval = v.val[0];
var added = false;
for (var i=0; i < result.val.length; i++) {
if (newval < result.val[i]) {
result.val.splice(i, 0, newval);
added=true;
break;
}
}
if ( !added ) {
result.val.splice(result.val.length, 0, newval);
}
});
return result;
}
Finalize just returns a simple array:
finalize = function( key, values ) {
// values is document with a sorted array
// do your "single reduce" functionality here
return values.val;
}
Run MapReduce:
> db.sorts.mapReduce(map, reduce, {out:"outs", finalize:finalize})
{
"result" : "outs",
"timeMillis" : 10,
"counts" : {
"input" : 9,
"emit" : 9,
"reduce" : 3,
"output" : 3
},
"ok" : 1,
}
Result is:
> db.outs.find()
{ "_id" : 1, "value" : [ 2, 15, 20 ] }
{ "_id" : 2, "value" : [ 1, 1, 12 ] }
{ "_id" : 3, "value" : [ 1, 5, 11 ] }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With