I see a mechanism to override the delimiter between key and value using mapreduce.textoutputformat.separator (using 1.03 of api). But I want to be able to control the separator between records. FYI I am using ArrayWritable as the value and NullWritable as the key.
In this Hadoop Reducer Output Format guide, will also discuss various types of Output Format in Hadoop like textOutputFormat, sequenceFileOutputFormat, mapFileOutputFormat, sequenceFileAsBinaryOutputFormat, DBOutputFormat, LazyOutputForma, and MultipleOutputs.
The Mapper processes the input, which are, the (key, value) pairs and provides an output, which are also (key, value) pairs. The output from the Mapper is called the intermediate output. The Mapper may use or completely ignore the input key. For example, a standard pattern is to read a file one line at a time.
OutputFormat describes the output-specification for a Map-Reduce job. The Map-Reduce framework relies on the OutputFormat of the job to: Validate the output-specification of the job. For e.g. check that the output directory doesn't already exist.
If you use a reducer, there is a pre-sorting process before the mapper's output is written to disk. Data gets sorted in the Reduce phase.
As far as I know this is not possible because TextOutputFormat
uses toString()
to get the text representation of the values, and in case of ArrayWritable
it doesn't implement toString()
, so you would probably end up with the default Object.toString()
if you were to write an ArrayWritable
to the output of your Reducer
. Or maybe you meant to change the separator between lines, in which case it's the same issue as TextOutputFormat
uses a \n character by default as pointed by climbage.
That being said, you could do it by implementing a custom output format where you would define your own RecordWriter
and have a custom configuration property in the getRecordWriter
method. Here is a quick & dirty implementation of such a class (not tested) which should do what you need and let you control the separator for an ArrayWritable
via the property mapred.arraywritable.separator and the separator between lines with mapred.line.separator :
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public class ArrayTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
protected static class ArrayLineRecordWriter<K, V> extends
LineRecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private final byte[] arraySeparator;
private final byte[] keyValueSeparator;
private final byte[] lineSeparator;
public ArrayLineRecordWriter(DataOutputStream out,
String keyValueSeparator, String arraySeparator, String lineSeparator) {
super(out);
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
this.arraySeparator = arraySeparator.getBytes(utf8);
this.lineSeparator = lineSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8
+ " encoding");
}
}
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else if (o instanceof ArrayWritable) {
ArrayWritable awo = (ArrayWritable) o;
for (String wrt : awo.toStrings()) {
out.write(wrt.toString().getBytes(utf8));
out.write(arraySeparator);
}
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value) throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(lineSeparator);
}
}
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = conf.get(
"mapred.textoutputformat.separator", "\t");
String arraySeparator = conf.get("mapred.arraywritable.separator", "|");
String lineSeparator = conf.get("mapred.line.separator");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new ArrayLineRecordWriter<K, V>(fileOut, keyValueSeparator,
arraySeparator, lineSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new ArrayLineRecordWriter<K, V>(new DataOutputStream(
codec.createOutputStream(fileOut)), keyValueSeparator,
arraySeparator, lineSeparator);
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With