Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Split reduced data into output and new input in Hadoop

I've been looking around for days trying to find a way using reduced data for further mapping in hadoop. I've got objects of class A as input data and objects of class B as output data. The Problem is, that while mapping not only Bs are generated but new As as well.

Here's what I'd like to achieve:

1.1 input: a list of As
1.2 map result: for each A a list of new As and a list of Bs is generated
1.3 reduce: filtered Bs are saved as output, filtered As are added to the map jobs

2.1 input: a list of As produced by the first map/reduce
2.2 map result: for each A a list of new As and a list of Bs is generated
2.3 ...

3.1 ...

You should get the basic idea.

I've read a lot about chaining but I'm not sure how to combine ChainReducer and ChainMapper or even if this would be the right approach.

So here's my question: How can I split the mapped data while reducing to save one part as output and the other part as new input data.

like image 485
Mennny Avatar asked Nov 04 '22 06:11

Mennny


1 Answers

Try using MultipleOutputs. As it's Javadoc suggests:

The MultipleOutputs class simplifies writing output data to multiple outputs

Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own OutputFormat, with its own key class and with its own value class.

Case two: to write data to different files provided by user

Usage pattern for job submission:

Job job = new Job();

 FileInputFormat.setInputPath(job, inDir);
 FileOutputFormat.setOutputPath(job, outDir);

 job.setMapperClass(MOMap.class);
 job.setReducerClass(MOReduce.class);
 ...

 // Defines additional single text based output 'text' for the job
 MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
 LongWritable.class, Text.class);

 // Defines additional sequence-file based output 'sequence' for the job
 MultipleOutputs.addNamedOutput(job, "seq",
   SequenceFileOutputFormat.class,
   LongWritable.class, Text.class);
 ...

 job.waitForCompletion(true);
 ...

Usage in Reducer:

 String generateFileName(K k, V v) {
   return k.toString() + "_" + v.toString();
 }

 public class MOReduce extends
   Reducer<WritableComparable, Writable,WritableComparable, Writable> {
 private MultipleOutputs mos;
 public void setup(Context context) {
 ...
 mos = new MultipleOutputs(context);
 }

 public void reduce(WritableComparable key, Iterator<Writable> values,
 Context context)
 throws IOException {
 ...
 mos.write("text", , key, new Text("Hello"));
 mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
 mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
 mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
 ...
 }

 public void cleanup(Context) throws IOException {
 mos.close();
 ...
 }

 }
like image 51
Amar Avatar answered Nov 14 '22 22:11

Amar