Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Hadoop for the First Time, MapReduce Job does not run Reduce Phase

I wrote a simple map reduce job that would read in data from the DFS and run a simple algorithm on it. When trying to debug it I decided to simply make the mappers output a single set of keys and values, and the reducers output an entirely different set. I am running this job on a single node Hadoop 20.2 cluster. When the job is finished the output contains simply the values that were outputted by the mappers leading me to believe that the reducer is not being run. I would greatly appreciate it if anyone provide any insight as to why my code is producing such output. I have tried setting the outputKeyClass and outputValueClass to different things as well as the setMapOutputKeyClass and setMapOutputValueClass to different things. Currently the commented our sections of code are the algorithm that I am running, but I have changed the map and reduce methods to simply output certain values. Once again, the output from the job contains only the values that were outputted by the mapper. Here is the class I used to run the job:

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class CalculateHistogram {

    public static class HistogramMap extends Mapper<LongWritable, Text, LongWritable, Text> {

        private static final int R = 100;
        private int n = 0;

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            if (n == 0) {
                StringTokenizer tokens = new StringTokenizer(value.toString(), ",");
                int counter = 0;
                while (tokens.hasMoreTokens()) {
                    String token = tokens.nextToken();
                    if (tokens.hasMoreTokens()) {
                        context.write(new LongWritable(-2), new Text("HI"));
                        //context.write(new LongWritable(counter), new Text(token));
                    }
                    counter++;
                    n++;
                }
            } else {
                n++;
                if (n == R) {
                    n = 0;
                }
                
            }
        }
    }

    public static class HistogramReduce extends Reducer<LongWritable, Text, LongWritable, HistogramBucket> {

        private final static int R = 10;

        public void reduce(LongWritable key, Iterator<Text> values, Context context)
                                            throws IOException, InterruptedException {
            if (key.toString().equals("-1")) {
                //context.write(key, new HistogramBucket(key));
            }
            Text t = values.next();
            for (char c : t.toString().toCharArray()) {
                if (!Character.isDigit(c) && c != '.') {
                    //context.write(key, new HistogramBucket(key));//if this isnt a numerical attribute we ignore it
                }
            }
            context.setStatus("Building Histogram");
            HistogramBucket i = new HistogramBucket(key);
            i.add(new DoubleWritable(Double.parseDouble(t.toString())));
            while (values.hasNext()) {
                for (int j = 0; j < R; j++) {
                    t = values.next();
                }
                if (!i.contains(Double.parseDouble(t.toString()))) {
                    context.setStatus("Writing a value to the Histogram");
                    i.add(new DoubleWritable(Double.parseDouble(t.toString())));
                }
            }
            
            context.write(new LongWritable(55555555), new HistogramBucket(new LongWritable(55555555)));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf, "MRDT - Generate Histogram");
        job.setJarByClass(CalculateHistogram.class);
        job.setMapperClass(HistogramMap.class);
        job.setReducerClass(HistogramReduce.class);

        //job.setOutputValueClass(HistogramBucket.class);
        
        //job.setMapOutputKeyClass(LongWritable.class);
        //job.setMapOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
like image 454
RedbeardTheNinja Avatar asked Nov 23 '10 06:11

RedbeardTheNinja


People also ask

What is MapReduce job in Hadoop?

Hadoop MapReduce – Map Only job. MapReduce is a software framework for easily writing applications that process the vast amount of structured and unstructured data stored in the Hadoop Distributed Filesystem (HDFS). Two important tasks done by MapReduce algorithm are: Map task and Reduce task.

What is Map Reduce in Hadoop?

One of the three components of Hadoop is Map Reduce. The first component of Hadoop that is, Hadoop Distributed File System (HDFS) is responsible for storing the file.

How does MapReduce process the data?

MapReduce processess the data in various phases with the help of different components. Let’s discuss the steps of job execution in Hadoop. 1. Input Files In input files data for MapReduce job is stored. In HDFS, input files reside. Input files format is arbitrary. Line-based log files and binary format can also be used.

How to enable speculative execution in Hadoop MapReduce Combiner?

You need to set the configuration parameters ‘ mapreduce.map.tasks.speculative.execution ’ and ‘ mapreduce.reduce.tasks.speculative.execution ’ to true for enabling speculative execution. This will reduce the job execution time if the task progress is slow due to memory unavailability. This was all about the Hadoop Mapreduce Combiner.


1 Answers

The signature of your reduce method is wrong. Your method signature contains Iterator<Text>. You have to pass an Iterable<Text>.

Your code does not override the reduce method of the Reducer base class. Because of this, the default imlementation provided by the Reducer base class is used. This implementation is an identity function.

Use the @Override annotation to anticipate errors like this one.

like image 52
Helmut Zechmann Avatar answered Oct 01 '22 04:10

Helmut Zechmann