My problem is sorting values in a file. keys and values are integers and need to maintain the keys of sorted values.
key value
1 24
3 4
4 12
5 23
output:
1 24
5 23
4 12
3 4
I am working with massive data and must run the code in a cluster of hadoop machines. How can i do it with mapreduce?
Sorting in Hadoop helps reducer to easily distinguish when a new reduce task should start. This saves time for the reducer. Reducer starts a new reduce task when the next key in the sorted input data is different than the previous. Each reduce task takes key-value pairs as input and generates key-value pair as output.
Sorting is one of the basic MapReduce algorithms to process and analyze data. MapReduce implements sorting algorithm to automatically sort the output key-value pairs from the mapper by their keys. Sorting methods are implemented in the mapper class itself.
(algorithm) Definition: A distribution sort algorithm that begins by removing the first 1/8 of the n items, sorting them (recursively), and putting them in an array. This creates n/8 buckets to which the remaining 7/8 of the items are distributed.
Shuffling in MapReduceIt is also the process by which the system performs the sort. Then it transfers the map output to the reducer as input. This is the reason shuffle phase is necessary for the reducers. Otherwise, they would not have any input (or input from every mapper).
You can probably do this (I'm assuming you are using Java here)
From maps emit like this -
context.write(24,1);
context.write(4,3);
context.write(12,4)
context.write(23,5)
So, all you values that needs to be sorted should be the key in your mapreduce job. Hadoop by default sorts by ascending order of key.
Hence, either you do this to sort in descending order,
job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
Or, this,
You need to set a custom Descending Sort Comparator, which goes something like this in your job.
public static class DescendingKeyComparator extends WritableComparator {
protected DescendingKeyComparator() {
super(Text.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
LongWritable key1 = (LongWritable) w1;
LongWritable key2 = (LongWritable) w2;
return -1 * key1.compareTo(key2);
}
}
The suffle and sort phase in Hadoop will take care of sorting your keys in descending order 24,4,12,23
After comment:
If you require a Descending IntWritable Comparable, you can create one and use it like this -
job.setSortComparatorClass(DescendingIntComparable.class);
In case if you are using JobConf, use this to set
jobConfObject.setOutputKeyComparatorClass(DescendingIntComparable.class);
Put the following code below your main()
function -
public static void main(String[] args) {
int exitCode = ToolRunner.run(new YourDriver(), args);
System.exit(exitCode);
}
//this class is defined outside of main not inside
public static class DescendingIntWritableComparable extends IntWritable {
/** A decreasing Comparator optimized for IntWritable. */
public static class DecreasingComparator extends Comparator {
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With