Hadoop Version: 0.20.2 (On Amazon EMR)
Problem: I have a custom key that i write during map phase which i added below. During the reduce call, I do some simple aggregation on values for a given key. Issue I am facing is that during the iteration of values in reduce call, my key got changed and i got values of that new key.
My key type:
class MyKey implements WritableComparable<MyKey>, Serializable {
private MyEnum type; //MyEnum is a simple enumeration.
private TreeMap<String, String> subKeys;
MyKey() {} //for hadoop
public MyKey(MyEnum t, Map<String, String> sK) { type = t; subKeys = new TreeMap(sk); }
public void readFields(DataInput in) throws IOException {
Text typeT = new Text();
typeT.readFields(in);
this.type = MyEnum.valueOf(typeT.toString());
subKeys.clear();
int i = WritableUtils.readVInt(in);
while ( 0 != i-- ) {
Text keyText = new Text();
keyText.readFields(in);
Text valueText = new Text();
valueText.readFields(in);
subKeys.put(keyText.toString(), valueText.toString());
}
}
public void write(DataOutput out) throws IOException {
new Text(type.name()).write(out);
WritableUtils.writeVInt(out, subKeys.size());
for (Entry<String, String> each: subKeys.entrySet()) {
new Text(each.getKey()).write(out);
new Text(each.getValue()).write(out);
}
}
public int compareTo(MyKey o) {
if (o == null) {
return 1;
}
int typeComparison = this.type.compareTo(o.type);
if (typeComparison == 0) {
if (this.subKeys.equals(o.subKeys)) {
return 0;
}
int x = this.subKeys.hashCode() - o.subKeys.hashCode();
return (x != 0 ? x : -1);
}
return typeComparison;
}
}
Is there anything wrong with this implementation of key? Following is the code where I am facing the mixup of keys in reduce call:
reduce(MyKey k, Iterable<MyValue> values, Context context) {
Iterator<MyValue> iterator = values.iterator();
int sum = 0;
while(iterator.hasNext()) {
MyValue value = iterator.next();
//when i come here in the 2nd iteration, if i print k, it is different from what it was in iteration 1.
sum += value.getResult();
}
//write sum to context
}
Any help in this would be greatly appreciated.
This is expected behavior (with the new API at least).
When the next
method for the underlying iterator of the values Iterable is called, the next key/value pair is read from the sorted mapper / combiner output, and checked that the key is still part of the same group as the previous key.
Because hadoop re-uses the objects passed to the reduce method (just calling the readFields method of the same object) the underlying contents of the Key parameter 'k' will change with each iteration of the values
Iterable.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With