Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Hadoop is it possible to specify the record delimiter for TextOutputFormat

Tags:

java

hadoop

I see a mechanism to override the delimiter between key and value using mapreduce.textoutputformat.separator (using 1.03 of api). But I want to be able to control the separator between records. FYI I am using ArrayWritable as the value and NullWritable as the key.

like image 278
Tom Depke Avatar asked Jun 04 '13 21:06

Tom Depke


People also ask

What are the different output format in Hadoop?

In this Hadoop Reducer Output Format guide, will also discuss various types of Output Format in Hadoop like textOutputFormat, sequenceFileOutputFormat, mapFileOutputFormat, sequenceFileAsBinaryOutputFormat, DBOutputFormat, LazyOutputForma, and MultipleOutputs.

What is the output key type of Mapper?

The Mapper processes the input, which are, the (key, value) pairs and provides an output, which are also (key, value) pairs. The output from the Mapper is called the intermediate output. The Mapper may use or completely ignore the input key. For example, a standard pattern is to read a file one line at a time.

What is OutputFormat class?

OutputFormat describes the output-specification for a Map-Reduce job. The Map-Reduce framework relies on the OutputFormat of the job to: Validate the output-specification of the job. For e.g. check that the output directory doesn't already exist.

In what form is reducer output presented sorted?

If you use a reducer, there is a pre-sorting process before the mapper's output is written to disk. Data gets sorted in the Reduce phase.


1 Answers

As far as I know this is not possible because TextOutputFormat uses toString() to get the text representation of the values, and in case of ArrayWritable it doesn't implement toString(), so you would probably end up with the default Object.toString() if you were to write an ArrayWritable to the output of your Reducer. Or maybe you meant to change the separator between lines, in which case it's the same issue as TextOutputFormat uses a \n character by default as pointed by climbage.

That being said, you could do it by implementing a custom output format where you would define your own RecordWriter and have a custom configuration property in the getRecordWriter method. Here is a quick & dirty implementation of such a class (not tested) which should do what you need and let you control the separator for an ArrayWritable via the property mapred.arraywritable.separator and the separator between lines with mapred.line.separator :

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class ArrayTextOutputFormat<K, V> extends TextOutputFormat<K, V> {

protected static class ArrayLineRecordWriter<K, V> extends
        LineRecordWriter<K, V> {


    private static final String utf8 = "UTF-8";
    private final byte[] arraySeparator;
    private final byte[] keyValueSeparator;
    private final byte[] lineSeparator;

    public ArrayLineRecordWriter(DataOutputStream out,
            String keyValueSeparator, String arraySeparator, String lineSeparator) {
        super(out);
        try {
            this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
            this.arraySeparator = arraySeparator.getBytes(utf8);
            this.lineSeparator = lineSeparator.getBytes(utf8);
        } catch (UnsupportedEncodingException uee) {
            throw new IllegalArgumentException("can't find " + utf8
                    + " encoding");
        }
    }

    private void writeObject(Object o) throws IOException {
        if (o instanceof Text) {
            Text to = (Text) o;
            out.write(to.getBytes(), 0, to.getLength());
        } else if (o instanceof ArrayWritable) {
            ArrayWritable awo = (ArrayWritable) o;
            for (String wrt : awo.toStrings()) {
                out.write(wrt.toString().getBytes(utf8));
                out.write(arraySeparator);
            }
        } else {
            out.write(o.toString().getBytes(utf8));
        }
    }

    public synchronized void write(K key, V value) throws IOException {

        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (nullKey && nullValue) {
            return;
        }
        if (!nullKey) {
            writeObject(key);
        }
        if (!(nullKey || nullValue)) {
            out.write(keyValueSeparator);
        }
        if (!nullValue) {
            writeObject(value);
        }
        out.write(lineSeparator);
    }
}

public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
        throws IOException, InterruptedException {
    Configuration conf = job.getConfiguration();
    boolean isCompressed = getCompressOutput(job);
    String keyValueSeparator = conf.get(
            "mapred.textoutputformat.separator", "\t");
    String arraySeparator = conf.get("mapred.arraywritable.separator", "|");
    String lineSeparator = conf.get("mapred.line.separator");
    CompressionCodec codec = null;
    String extension = "";
    if (isCompressed) {
        Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
                job, GzipCodec.class);
        codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
                conf);
        extension = codec.getDefaultExtension();
    }
    Path file = getDefaultWorkFile(job, extension);
    FileSystem fs = file.getFileSystem(conf);
    if (!isCompressed) {
        FSDataOutputStream fileOut = fs.create(file, false);
        return new ArrayLineRecordWriter<K, V>(fileOut, keyValueSeparator,
                arraySeparator, lineSeparator);
    } else {
        FSDataOutputStream fileOut = fs.create(file, false);
        return new ArrayLineRecordWriter<K, V>(new DataOutputStream(
                codec.createOutputStream(fileOut)), keyValueSeparator,
                arraySeparator, lineSeparator);
    }
}
}
like image 70
Charles Menguy Avatar answered Sep 30 '22 08:09

Charles Menguy