Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to send parts of the mapper out put to reducer, while just writing the other part to HDFS, in hadoop?

I want to write parts of the mapper output to a folder, say folder A in HDFS. The other part of the output, I want it to be processed by the reducer. Is this possible? I am aware of Multiple Outputs. Is this possible using multiple outputs?

Thanks!

like image 624
Mahalakshmi Lakshminarayanan Avatar asked Oct 07 '12 13:10

Mahalakshmi Lakshminarayanan


2 Answers

Yes it is possible to do with MultipleOutputs, according to docs any output passed via MultipleOutputs during map stage are ignored by reducer, so this is exactly what you want. I write a small example on my GitHub and I hope you'll find it useful.

like image 156
rystsov Avatar answered Sep 28 '22 06:09

rystsov


You can just write output directly to HDFS from your mapper implementation - Just create a FileSystem object using the context's configuration and then create a file, write to it and remember to close it out:

public void cleanup(Context context) {
    FileSystem fs = FileSystem.get(context.getConfiguration());
    PrintStream ps = new PrintStream(fs.create(
        new Path("/path/to/output", "map-output")));
    ps.println("test");
    ps.close();
}

Other things for consideration - each file needs to be uniquely named in HDFS, so you could suffix the filename with the mapper ID number, but you also need to appreciate speculative execution (in that your mapper task instance may be running in two locations - both trying to write to the same file in HDFS).

You're normally abstracted away from this as the Output Committer creates files in a tmp HDFS directory with the task ID and attempt number, only moving it to the correct location and file name upon commit of that task attempt. There's no way around this problem when running map-side (data is written to the local file system) without either turning off speculative execution or creating multiple files in HDFS, one of each attempt.

So a more 'complete' solution would look like:

FileSystem fs = FileSystem.get(context.getConfiguration());
PrintStream ps = new PrintStream(fs.create(new Path(
        "/path/to/output", String.format("map-output-%05d-%d",
        context.getTaskAttemptID().getTaskID().getId(),
        context.getTaskAttemptID().getId()))));
ps.println("test");
ps.close();

MultipleOutputs would help you reduce side, but i don't think map-side it would work as there is no output committer and the work directory is not in HDFS.

Of course, if this was a mapper only job, then MultipleOutputs would work. So an alternative approach would be to run a map only job, and then use the desired portion of the output in a secondary job (with an identity mapper) - depends on how much data your moving i guess.

like image 21
Chris White Avatar answered Sep 28 '22 08:09

Chris White