I am looking for a little clarification on the the answers to this question here:
Generating Separate Output files in Hadoop Streaming
My use case is as follows:
I have a map-only mapreduce job that takes an input file, does a lot of parsing and munging, and then writes back out. However, certain lines may or may not be in an incorrect format, and if that is the case, I would like to write the original line to a separate file.
It seems that one way to do this would be to prepend the name of the file to the line I am printing and use the multipleOutputFormat parameter. For example, if I originally had:
if line_is_valid(line):
print name + '\t' + comments
I could instead do:
if line_is_valid(line):
print valid_file_name + '\t' + name + '\t' + comments
else:
print err_file_name + '\t' + line
The only problem I have with this solution is that I don't want the file_name to appear as the first column in the textfiles. I suppose I could then run another job to strip out the first column of each file, but that seems kind of silly. So:
1) Is this the correct way to manage multiple output files with a python mapreduce job?
2) What is the best way to get rid of that initial column?
MultipleOutputs class provide facility to write Hadoop map/reducer output to more than one folders. Basically, we can use MultipleOutputs when we want to write outputs other than map reduce job default output and write map reduce job output to different files provided by a user.
Each Reducer produces 1 output file with the name part -r nnnnn, here nnnnn is a running sequence number and it is based on number of reducers are running for a job.
Let us now see how Hadoop Streaming works. The mapper and the reducer (in the above example) are the scripts that read the input line-by-line from stdin and emit the output to stdout. The utility creates a Map/Reduce job and submits the job to an appropriate cluster and monitor the job progress until its completion.
You can do something like the following, but it involves a little Java compiling, which I think shouldn't be a problem, if you want your use case done anyway with Python- From Python, as far as I know it's not directly possible to skip the filename from the final output as your use case demands in a single job. But what's shown below can make it possible with ease!
Here is the Java class that's need to compiled -
package com.custom;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
public class CustomMultiOutputFormat extends MultipleTextOutputFormat<Text, Text> {
/**
* Use they key as part of the path for the final output file.
*/
@Override
protected String generateFileNameForKeyValue(Text key, Text value, String leaf) {
return new Path(key.toString(), leaf).toString();
}
/**
* We discard the key as per your requirement
*/
@Override
protected Text generateActualKey(Text key, Text value) {
return null;
}
}
Steps to compile:
While you are in the directory where the above saved file is, type -
$JAVA_HOME/bin/javac -cp $(hadoop classpath) -d . CustomMultiOutputFormat.java
Make sure JAVA_HOME is set to /path/to/your/SUNJDK before attempting the above command.
Make your custom.jar file using (type exactly) -
$JAVA_HOME/bin/jar cvf custom.jar com/custom/CustomMultiOutputFormat.class
Finally, run your job like -
hadoop jar /path/to/your/hadoop-streaming-*.jar -libjars custom.jar -outputformat com.custom.CustomMultiOutputFormat -file your_script.py -input inputpath --numReduceTasks 0 -output outputpath -mapper your_script.py
After doing these you should see two directories inside your outputpath one with valid_file_name and other with err_file_name. All records having valid_file_name as a tag will go to valid_file_name directory and all records having err_file_name would go to err_file_name directory.
I hope all these makes sense.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With