I wrote a simple map reduce job that would read in data from the DFS and run a simple algorithm on it. When trying to debug it I decided to simply make the mappers output a single set of keys and values, and the reducers output an entirely different set. I am running this job on a single node Hadoop 20.2 cluster. When the job is finished the output contains simply the values that were outputted by the mappers leading me to believe that the reducer is not being run. I would greatly appreciate it if anyone provide any insight as to why my code is producing such output. I have tried setting the outputKeyClass and outputValueClass to different things as well as the setMapOutputKeyClass and setMapOutputValueClass to different things. Currently the commented our sections of code are the algorithm that I am running, but I have changed the map and reduce methods to simply output certain values. Once again, the output from the job contains only the values that were outputted by the mapper. Here is the class I used to run the job:
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class CalculateHistogram {
public static class HistogramMap extends Mapper<LongWritable, Text, LongWritable, Text> {
private static final int R = 100;
private int n = 0;
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (n == 0) {
StringTokenizer tokens = new StringTokenizer(value.toString(), ",");
int counter = 0;
while (tokens.hasMoreTokens()) {
String token = tokens.nextToken();
if (tokens.hasMoreTokens()) {
context.write(new LongWritable(-2), new Text("HI"));
//context.write(new LongWritable(counter), new Text(token));
}
counter++;
n++;
}
} else {
n++;
if (n == R) {
n = 0;
}
}
}
}
public static class HistogramReduce extends Reducer<LongWritable, Text, LongWritable, HistogramBucket> {
private final static int R = 10;
public void reduce(LongWritable key, Iterator<Text> values, Context context)
throws IOException, InterruptedException {
if (key.toString().equals("-1")) {
//context.write(key, new HistogramBucket(key));
}
Text t = values.next();
for (char c : t.toString().toCharArray()) {
if (!Character.isDigit(c) && c != '.') {
//context.write(key, new HistogramBucket(key));//if this isnt a numerical attribute we ignore it
}
}
context.setStatus("Building Histogram");
HistogramBucket i = new HistogramBucket(key);
i.add(new DoubleWritable(Double.parseDouble(t.toString())));
while (values.hasNext()) {
for (int j = 0; j < R; j++) {
t = values.next();
}
if (!i.contains(Double.parseDouble(t.toString()))) {
context.setStatus("Writing a value to the Histogram");
i.add(new DoubleWritable(Double.parseDouble(t.toString())));
}
}
context.write(new LongWritable(55555555), new HistogramBucket(new LongWritable(55555555)));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "MRDT - Generate Histogram");
job.setJarByClass(CalculateHistogram.class);
job.setMapperClass(HistogramMap.class);
job.setReducerClass(HistogramReduce.class);
//job.setOutputValueClass(HistogramBucket.class);
//job.setMapOutputKeyClass(LongWritable.class);
//job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Hadoop MapReduce – Map Only job. MapReduce is a software framework for easily writing applications that process the vast amount of structured and unstructured data stored in the Hadoop Distributed Filesystem (HDFS). Two important tasks done by MapReduce algorithm are: Map task and Reduce task.
One of the three components of Hadoop is Map Reduce. The first component of Hadoop that is, Hadoop Distributed File System (HDFS) is responsible for storing the file.
MapReduce processess the data in various phases with the help of different components. Let’s discuss the steps of job execution in Hadoop. 1. Input Files In input files data for MapReduce job is stored. In HDFS, input files reside. Input files format is arbitrary. Line-based log files and binary format can also be used.
You need to set the configuration parameters ‘ mapreduce.map.tasks.speculative.execution ’ and ‘ mapreduce.reduce.tasks.speculative.execution ’ to true for enabling speculative execution. This will reduce the job execution time if the task progress is slow due to memory unavailability. This was all about the Hadoop Mapreduce Combiner.
The signature of your reduce method is wrong. Your method signature contains Iterator<Text>
. You have to pass an Iterable<Text>
.
Your code does not override the reduce
method of the Reducer
base class. Because of this, the default imlementation provided by the Reducer
base class is used. This implementation is an identity function.
Use the @Override
annotation to anticipate errors like this one.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With