This may seem like a stupid question, but I fail to see the problem in my types in my mapreduce code for hadoop
As stated in the question the problem is that it is expecting IntWritable but I'm passing it a Text object in the collector.collect of the reducer.
My job configuration has the following mapper output classes:
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(IntWritable.class);
And the following reducer output classes:
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
My mapping class has the following definition:
public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, Text, IntWritable>
with the required function:
public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter)
And then it fails when I call:
output.collect(new Text(),new IntWritable());
I'm fairly new to map reduce but all the types seem to match, it compiles but then fails on that line saying its expecting an IntWritable as the key for the reduce class. If it matters I'm using 0.21 version of Hadoop
Here is my map class:
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, IntWritable> {
private IntWritable node = new IntWritable();
private IntWritable edge = new IntWritable();
public void map(LongWritable key, Text value, OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
node.set(Integer.parseInt(tokenizer.nextToken()));
edge.set(Integer.parseInt(tokenizer.nextToken()));
if(node.get() < edge.get())
output.collect(node, edge);
}
}
}
and my reduce class:
public static class Reduce extends MapReduceBase implements Reducer<IntWritable, IntWritable, Text, IntWritable> {
IntWritable $ = new IntWritable(Integer.MAX_VALUE);
Text keyText = new Text();
public void reduce(IntWritable key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
ArrayList<IntWritable> valueList = new ArrayList<IntWritable>();
//outputs original edge pair as key and $ for value
while (values.hasNext()) {
IntWritable value = values.next();
valueList.add(value);
keyText.set(key.get() + ", " + value.get());
output.collect(keyText, $);
}
//outputs all the 2 length pairs
for(int i = 0; i < valueList.size(); i++)
for(int j = i+1; i < valueList.size(); j++)
output.collect(new Text(valueList.get(i).get() + ", " + valueList.get(j).get()), key);
}
}
and my job configuration:
JobConf conf = new JobConf(Triangles.class);
conf.setJobName("mapred1");
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path("mapred1"));
JobClient.runJob(conf);
Your problem is that you set the Reduce class as a combiner
conf.setCombinerClass(Reduce.class);
Combiners run in the map phase and they need to emit the same key/value type (IntWriteable, IntWritable in your case) remove this line and you should be ok
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With