Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

manipulating iterator in mapreduce

I am trying to find the sum of any given points using hadoop, The issue I am having is on getting all values from a given key in a single reducer. It looks like this.

Reducer:

 public static class Reduce extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, DoubleWritable> {

    public void reduce(Text key, Iterator<IntWritable> values,
            OutputCollector<Text, DoubleWritable> output, Reporter reporter)
            throws IOException {
        Text word = new Text();

        Iterator<IntWritable> tr = values;
        IntWritable v;
        while (tr.hasNext()) {
             v = tr.next();

            Iterator<IntWritable> td = values;
            while (td.hasNext()) {

                IntWritable u = td.next();
                double sum = u+v;
                word.set( u + " + " + v);
                output.collect(word, new DoubleWritable(sum));
            }
        }
    }
}

And I am trying to create two copies of the Iterator variable so that I can go through all the values of the second iterator while I get a single value from the previous Iterator( Two while loops above) but the two iterators hold the same value all the time.

I am not sure if this is the right way to do it.

like image 670
tkt986 Avatar asked Aug 14 '10 03:08

tkt986


2 Answers

The iterators in the reducer are not as simple as you might think.

The issue is that the total number of items that you are iterating through might not fit into memory. That means that the iterator may be reading from disk. If you have two independent copies of the iterator, then you can have one of them far ahead of the other which implies that the data between where the two iterators point can't be dropped.

For simplicity of implementation, Hadoop doesn't support having more than one iterator for the reduce values.

The practical impact of this is that you can't go through the same iterator twice. That isn't nice, but it is the case. If you absolutely know that the number of items will fit into memory, then you can copy all the items into a list as suggested by MrGomez. If you don't know that, you may have to use secondary storage.

The better approach is to redesign your program so that you don't need unbounded storage in the reducer. This can get a bit tricky, but there are standard approaches to the problem.

For your particular problem, you have a quadratic growth in output size relative to the largest reduce input set. This is usually a really bad idea. In most cases you don't need ALL pairs, just the most important pairs. If you can trim the set of pairs in some way, then you are all set and you may be able to remove the all pairs constraint.

For instance, if you are trying to find the 100 pairs with the largest sum for each reduce set, you can keep a priority queue with the 100 largest inputs seen so far and a priority queue with the 100 largest sums seen so far. For each new input, you can form the sum with the largest 100 numbers seen so far and try to stick those sums into the second queue. Finally, you should stick the new input into the first queue and trim both queues to 100 elements by deleting the smallest values (if necessary). In the close method of the reduce, you should dump the priority queue. This approach guarantees that you only need min(n^2, 200) elements of storage which avoids the n^2 problem and avoids the double pass through the input by keeping the 100 largest items seen rather than all items seen.

like image 87
Ted Dunning Avatar answered Oct 03 '22 15:10

Ted Dunning


I'm not sure exactly what you're trying to accomplish, but I know this much: the behavior of Hadoop's Iterators is a bit strange. Calling Iterator.next() will always return the SAME EXACT instance of IntWritable, with the contents of that instance replaced with the next value. So holding a reference to the IntWritable across calls to Iterator.next() is almost always a mistake. I believe this behavior is by design to reduce the amount of object creation and GC overhead.

One way to get around this is to use WritableUtils.clone() to clone the instance you're trying to preserve across calls to Iterator.next().

like image 27
bajafresh4life Avatar answered Oct 03 '22 14:10

bajafresh4life