I've been looking around for days trying to find a way using reduced data for further mapping in hadoop. I've got objects of class A
as input data and objects of class B
as output data. The Problem is, that while mapping not only B
s are generated but new A
s as well.
Here's what I'd like to achieve:
1.1 input: a list of As
1.2 map result: for each A a list of new As and a list of Bs is generated
1.3 reduce: filtered Bs are saved as output, filtered As are added to the map jobs
2.1 input: a list of As produced by the first map/reduce
2.2 map result: for each A a list of new As and a list of Bs is generated
2.3 ...
3.1 ...
You should get the basic idea.
I've read a lot about chaining but I'm not sure how to combine ChainReducer and ChainMapper or even if this would be the right approach.
So here's my question: How can I split the mapped data while reducing to save one part as output and the other part as new input data.
Try using MultipleOutputs. As it's Javadoc suggests:
The MultipleOutputs class simplifies writing output data to multiple outputs
Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own OutputFormat, with its own key class and with its own value class.
Case two: to write data to different files provided by user
Usage pattern for job submission:
Job job = new Job();
FileInputFormat.setInputPath(job, inDir);
FileOutputFormat.setOutputPath(job, outDir);
job.setMapperClass(MOMap.class);
job.setReducerClass(MOReduce.class);
...
// Defines additional single text based output 'text' for the job
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
LongWritable.class, Text.class);
// Defines additional sequence-file based output 'sequence' for the job
MultipleOutputs.addNamedOutput(job, "seq",
SequenceFileOutputFormat.class,
LongWritable.class, Text.class);
...
job.waitForCompletion(true);
...
Usage in Reducer:
String generateFileName(K k, V v) {
return k.toString() + "_" + v.toString();
}
public class MOReduce extends
Reducer<WritableComparable, Writable,WritableComparable, Writable> {
private MultipleOutputs mos;
public void setup(Context context) {
...
mos = new MultipleOutputs(context);
}
public void reduce(WritableComparable key, Iterator<Writable> values,
Context context)
throws IOException {
...
mos.write("text", , key, new Text("Hello"));
mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
...
}
public void cleanup(Context) throws IOException {
mos.close();
...
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With