I'm working on something similar to the canonical MapReduce example - the word count, but with a twist in that I'm looking to only get the Top N results.
Let's say I have a very large set of text data in HDFS. There are plenty of examples that show how to build a Hadoop MapReduce job that will provide you with a word count for every word in that text. For example, if my corpus is:
"This is a test of test data and a good one to test this"
The result set from the standard MapReduce word count job would be:
test:3, a:2, this:2, is: 1, etc..
But what if I ONLY want to get the Top 3 words that were used in my entire set of data?
I can still run the exact same standard MapReduce word-count job, and then just take the Top 3 results once it is ready and is spitting out the count for EVERY word, but that seems a little inefficient, because a lot of data needs to be moved around during the shuffle phase.
What I'm thinking is that, if this sample is large enough, and the data is well randomly and well distributed in HDFS, that each Mapper does not need to send ALL of its word counts to the Reducers, but rather, only some of the top data. So if one mapper has this:
a:8234, the: 5422, man: 4352, ...... many more words ... , rareword: 1, weirdword: 1, etc.
Then what I'd like to do is only send the Top 100 or so words from each Mapper to the Reducer phase - since there is very little chance that "rareword" will suddenly end up in the Top 3 when all is said and done. This seems like it would save on bandwidth and also on Reducer processing time.
Can this be done in the Combiner phase? Is this sort of optimization prior to the shuffle phase commonly done?
Approach Used: Using TreeMap. Here, the idea is to use Mappers to find local top 10 records, as there can be many Mappers running parallelly on different blocks of data of a file. And then all these local top 10 records will be aggregated at Reducer where we find top 10 global records for the file.
The text from the input text file is tokenized into words to form a key value pair with all the words present in the input text file. The key is the word from the input file and value is '1'. This is how the MapReduce word count program executes and outputs the number of occurrences of a word in any given input file.
MapReduce Word Count is a framework which splits the chunk of data, sorts the map outputs and input to reduce tasks. A File-system stores the output and input of jobs. Re-execution of failed tasks, scheduling them and monitoring them is the task of the framework.
This is a very good question, because you have hit the inefficiency of Hadoop's word count example.
The tricks to optimize your problem are the following:
Do a HashMap
based grouping in your local map stage, you can also use a combiner for that. This can look like this, I'm using the HashMultiSet
of Guava, which faciliates a nice counting mechanism.
public static class WordFrequencyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private final HashMultiset<String> wordCountSet = HashMultiset.create();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
for (String token : tokens) {
wordCountSet.add(token);
}
}
And you emit the result in your cleanup stage:
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
Text key = new Text();
LongWritable value = new LongWritable();
for (Entry<String> entry : wordCountSet.entrySet()) {
key.set(entry.getElement());
value.set(entry.getCount());
context.write(key, value);
}
}
So you have grouped the words in a local block of work, thus reducing network usage by using a bit of RAM. You can also do the same with a Combiner
, but it is sorting to group- so this would be slower (especially for strings!) than using a HashMultiset
.
To just get the Top N, you will only have to write the Top N in that local HashMultiset
to the output collector and aggregate the results in your normal way on the reduce side.
This saves you a lot of network bandwidth as well, the only drawback is that you need to sort the word-count tuples in your cleanup method.
A part of the code might look like this:
Set<String> elementSet = wordCountSet.elementSet();
String[] array = elementSet.toArray(new String[elementSet.size()]);
Arrays.sort(array, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
// sort descending
return Long.compare(wordCountSet.count(o2), wordCountSet.count(o1));
}
});
Text key = new Text();
LongWritable value = new LongWritable();
// just emit the first n records
for(int i = 0; i < N, i++){
key.set(array[i]);
value.set(wordCountSet.count(array[i]));
context.write(key, value);
}
Hope you get the gist of doing as much of the word locally and then just aggregate the top N of the top N's ;)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With