Multiple Output Files for Hadoop Streaming with Python Mapper




I am looking for a little clarification on the the answers to this question here:

Generating Separate Output files in Hadoop Streaming

My use case is as follows:

I have a map-only mapreduce job that takes an input file, does a lot of parsing and munging, and then writes back out. However, certain lines may or may not be in an incorrect format, and if that is the case, I would like to write the original line to a separate file.

It seems that one way to do this would be to prepend the name of the file to the line I am printing and use the multipleOutputFormat parameter. For example, if I originally had:

if line_is_valid(line):
    print name + '\t' + comments

I could instead do:

if line_is_valid(line):
    print valid_file_name + '\t' + name + '\t' + comments
    print err_file_name + '\t' + line

The only problem I have with this solution is that I don't want the file_name to appear as the first column in the textfiles. I suppose I could then run another job to strip out the first column of each file, but that seems kind of silly. So:

1) Is this the correct way to manage multiple output files with a python mapreduce job?

2) What is the best way to get rid of that initial column?

1 Answers

You can do something like the following, but it involves a little Java compiling, which I think shouldn't be a problem, if you want your use case done anyway with Python- From Python, as far as I know it's not directly possible to skip the filename from the final output as your use case demands in a single job. But what's shown below can make it possible with ease!

Here is the Java class that's need to compiled -

package com.custom;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;

 public class CustomMultiOutputFormat extends MultipleTextOutputFormat<Text, Text> {
        * Use they key as part of the path for the final output file.
       protected String generateFileNameForKeyValue(Text key, Text value, String leaf) {
             return new Path(key.toString(), leaf).toString();

        * We discard the key as per your requirement
       protected Text generateActualKey(Text key, Text value) {
             return null;

Steps to compile:

  1. Save the text to a file exactly (no different name) CustomMultiOutputFormat.java
  2. While you are in the directory where the above saved file is, type -

    $JAVA_HOME/bin/javac -cp $(hadoop classpath) -d . CustomMultiOutputFormat.java

  3. Make sure JAVA_HOME is set to /path/to/your/SUNJDK before attempting the above command.

  4. Make your custom.jar file using (type exactly) -

    $JAVA_HOME/bin/jar cvf custom.jar com/custom/CustomMultiOutputFormat.class

  5. Finally, run your job like -

    hadoop jar /path/to/your/hadoop-streaming-*.jar -libjars custom.jar -outputformat com.custom.CustomMultiOutputFormat -file your_script.py -input inputpath --numReduceTasks 0 -output outputpath -mapper your_script.py

After doing these you should see two directories inside your outputpath one with valid_file_name and other with err_file_name. All records having valid_file_name as a tag will go to valid_file_name directory and all records having err_file_name would go to err_file_name directory.

I hope all these makes sense.

