Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MapReduce example

I was reading about mapreduce and I was wondering about a particular scenario. Let's say we have a few files (fileA, fileB, fileC for example), each consisting of multiple integers. If we wanted to sort the numbers from all the files to create something like this:

23 fileA
34 fileB
35 fileA
60 fileA
60 fileC

how would the map and reduce process work?

Currently, this is what I have but it is not quite correct;

  1. (fileName, fileContent) -> (map to) (Number, fileName)

  2. sort the temporary key,value pairs and get (Number, (list of){fileName1, fileName2...})

  3. Reduce the temporary pairs and get

    (Number, fileName1)
    (Number, fileName2)
    

    and so on

The problem is that during the sorting phase, the filenames may not be be in alphabetical order and so the reduce part will not generate a correct output. Could someone provide some insight as to the correct approach for this scenario?

like image 204
Dobby Avatar asked Oct 19 '22 20:10

Dobby


1 Answers

The best way to achieve this is through secondary sort. You need to sort both keys (in your case numbers) and values (in your case file names). In Hadoop, the mapper output is only sorted on keys.

This can be achieved by using a composite key: the key which is a combination of both numbers and file names. For e.g. for first record, the key will be (23, fileA), instead of just (23).

You can read about secondary sort here: https://www.safaribooksonline.com/library/view/data-algorithms/9781491906170/ch01.html

You can also go through the section "Secondary Sort", in "Hadoop The Definitive Guide" book.

For the sake of simplicity, I have written a program to achieve the same.

In this program, the keys are sorted by default by the mappers. I have written a logic to sort the values at the reducer side. So it takes care of sorting both keys and values and produces that desired output.

Following is the program:

package com.myorg.hadooptests;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.*;

public class SortedValue {


    public static class SortedValueMapper
            extends Mapper<LongWritable, Text , Text, IntWritable>{

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] tokens = value.toString().split(" ");

            if(tokens.length == 2) {
                context.write(new Text(tokens[1]), new IntWritable(Integer.parseInt(tokens[0])));
            }
        }
    }

    public static class SortedValueReducer
            extends Reducer<Text, IntWritable, IntWritable, Text> {

        Map<String, ArrayList<Integer>> valueMap = new HashMap<String, ArrayList<Integer>>();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context) throws IOException, InterruptedException {

            String keyStr = key.toString();
            ArrayList<Integer> storedValues = valueMap.get(keyStr);

            for (IntWritable value : values) {
                if (storedValues == null) {
                    storedValues = new ArrayList<Integer>();
                    valueMap.put(keyStr, storedValues);
                }
                storedValues.add(value.get());
            }

            Collections.sort(storedValues);
            for (Integer val : storedValues) {
                context.write(new IntWritable(val), key);
            }
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "CompositeKeyExample");
        job.setJarByClass(SortedValue.class);
        job.setMapperClass(SortedValueMapper.class);
        job.setReducerClass(SortedValueReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path("/in/in1.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/out/"));

        System.exit(job.waitForCompletion(true) ? 0:1);

    }
}

Mapper Logic:

  1. Parses each line. Assumes that key and value are separated by a blank character (" ").
  2. If the line contains 2 tokens, it emits (filename, integer value). For e.g. for the first record, it emits (fileA, 23).

Reducer Logic:

  1. It puts the (key, value) pairs in a HashMap, where key is the file name and value is a list of integers for that file. For e.g. for fileA, values stored will be 23, 34 and 35

  2. Finally, it sorts the values for a particular key and for each value emits (value, key) from the reducer. For e.g. for fileA, the records output are: (23, fileA), (34, fileA) and (35, fileA)

I ran this program for the following input:

34 fileB
35 fileA
60 fileC
60 fileA
23 fileA

I got the following output:

23      fileA
35      fileA
60      fileA
34      fileB
60      fileC
like image 199
Manjunath Ballur Avatar answered Oct 27 '22 19:10

Manjunath Ballur