Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Find the average of numbers using MapReduce

I have been trying to write some code to find the average of numbers using MapReduce.

I am trying to use global counters to reach my goal but I am not able to set the counter value in the map method of my Mapper and I am also not able to retrive the counter value in the reduce method of my Reducer.

Do I have to use a global counter in map anyway (e.g. by using incrCounter(key, amount) of the provided Reporter)? Or would you suggest any different logic to get the average of some numbers?

like image 338
Amnesiac Avatar asked May 19 '12 17:05

Amnesiac


2 Answers

The arithmetic mean is an aggregate function which is not distributive but algebraic. According to Han et al. an aggregate function is distributive if:

[...] it can be computed [...] as follows. Suppose [..] data are partitioned into n sets. We apply the function to each partition, resulting in n aggregate values. If the result derived by applying the function to the n aggregate values is the same as that derived by applying the function to the entire data set (without partitioning), the function can be computed in a distributed manner.

Or in other words, it has to be associative and commutative. An aggregate function however is algebraic according to Han et al. if:

[...] it can be computed by an algebraic function with m arguments (where m is a bounded positive integer), each of which is obtained by applying a distributive aggregate function.

For the arithmetic mean this is just avg = sum/count. Obviously you need to carry a count additionally. But using a global counter therefor seems to be a misuse. The API describes org.apache.hadoop.mapreduce.Counter as follows:

A named counter that tracks the progress of a map/reduce job.

Counters should be typically used for statistics about jobs anyway but not as part of calculations during the data processing itself.

So everything you have to do within a partition is to add your numbers up and track their count together with the sum (sum, count); a simple approach could be a string like <sum><separator><count>.

In the mapper the count will be always 1 and the sum is the raw value itself. To reduce the map files already you could use the combiner and process the aggregates like (sum_1 + ... + sum_n, count_1 + ... + count_n). This must be repeated in the reducer and finished by the final calculation sum/count. Keep in mind that this approach is independent from the used key!

Finally here is a simple example using the raw crime statistics of the LAPD which should calculates the "average crime time" in Los Angeles:

public class Driver extends Configured implements Tool {
    enum Counters {
        DISCARDED_ENTRY
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Driver(), args);
    }

    public int run(String[] args) throws Exception {
        Configuration configuration = getConf();

        Job job = Job.getInstance(configuration);
        job.setJarByClass(Driver.class);

        job.setMapperClass(Mapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setCombinerClass(Combiner.class);
        job.setReducerClass(Reducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0 : -1;
    }
}

public class Mapper extends org.apache.hadoop.mapreduce.Mapper<
    LongWritable,
    Text,
    LongWritable,
    Text
> {

    @Override
    protected void map(
        LongWritable key,
        Text value,
        org.apache.hadoop.mapreduce.Mapper<
            LongWritable,
            Text,
            LongWritable,
            Text
        >.Context context
    ) throws IOException, InterruptedException {
            // parse the CSV line
            ArrayList<String> values = this.parse(value.toString());

            // validate the parsed values
            if (this.isValid(values)) {

                // fetch the third and the fourth column
                String time = values.get(3);
                String year = values.get(2)
                    .substring(values.get(2).length() - 4);

                // convert time to minutes (e.g. 1542 -> 942)
                int minutes = Integer.parseInt(time.substring(0, 2))
                    * 60 + Integer.parseInt(time.substring(2,4));

                // create the aggregate atom (a/n)
                // with a = time in minutes and n = 1
                context.write(
                    new LongWritable(Integer.parseInt(year)),
                    new Text(Integer.toString(minutes) + ":1")
                );
            } else {
                // invalid line format, so we increment a counter
                context.getCounter(Driver.Counters.DISCARDED_ENTRY)
                    .increment(1);
            }
    }

    protected boolean isValid(ArrayList<String> values) {
        return values.size() > 3 
            && values.get(2).length() == 10 
            && values.get(3).length() == 4;
    }

    protected ArrayList<String> parse(String line) {
        ArrayList<String> values = new ArrayList<>();
        String current = "";
        boolean escaping = false;

        for (int i = 0; i < line.length(); i++){
            char c = line.charAt(i);

            if (c == '"') {
                escaping = !escaping;
            } else if (c == ',' && !escaping) {
                values.add(current);
                current = "";
            } else {
                current += c;
            }
        }

        values.add(current);

        return values;
    }
}

public class Combiner extends org.apache.hadoop.mapreduce.Reducer<
    LongWritable,
    Text,
    LongWritable,
    Text
> {

    @Override
    protected void reduce(
        LongWritable key,
        Iterable<Text> values,
        Context context
    ) throws IOException, InterruptedException {
        Long n = 0l;
        Long a = 0l;
        Iterator<Text> iterator = values.iterator();

        // calculate intermediate aggregates
        while (iterator.hasNext()) {
            String[] atom = iterator.next().toString().split(":");
            a += Long.parseLong(atom[0]);
            n += Long.parseLong(atom[1]);
        }

        context.write(key, new Text(Long.toString(a) + ":" + Long.toString(n)));
    }
}

public class Reducer extends org.apache.hadoop.mapreduce.Reducer<
    LongWritable,
    Text,
    LongWritable,
    Text
> {

    @Override
    protected void reduce(
        LongWritable key, 
        Iterable<Text> values, 
        Context context
    ) throws IOException, InterruptedException {
        Long n = 0l;
        Long a = 0l;
        Iterator<Text> iterator = values.iterator();

        // calculate the finale aggregate
        while (iterator.hasNext()) {
            String[] atom = iterator.next().toString().split(":");
            a += Long.parseLong(atom[0]);
            n += Long.parseLong(atom[1]);
        }

        // cut of seconds
        int average = Math.round(a / n);

        // convert the average minutes back to time
        context.write(
            key,
            new Text(
                Integer.toString(average / 60) 
                    + ":" + Integer.toString(average % 60)
            )
        );
    }
}
like image 189
witrin Avatar answered Oct 29 '22 23:10

witrin


The logic is quite simple: If all the number have the same key, then the mapper sent all the values you want to find the average of with that same key. Because of this, in the reducer you can sum the values in the iterator. You can then keep a counter on number time the iterator works, which solves the issue of how many items are to be averaged. Finally, after the iterator, you can find the average by dividing the sum by the number of items.

Be careful, this logic will not work if the combiner class is set as the same class as reducer...

like image 25
Sibimon Sasidharan Avatar answered Oct 30 '22 00:10

Sibimon Sasidharan