Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple lines of text to a single map

I've been trying to use Hadoop to send N amount of lines to a single mapping. I don't require for the lines to be split already.

I've tried to use NLineInputFormat, however that sends N lines of text from the data to each mapper one line at a time [giving up after the Nth line].

I have tried to set the option and it only takes N lines of input sending it at 1 line at a time to each map:

    job.setInt("mapred.line.input.format.linespermap", 10);

I've found a mailing list recommending me to override LineRecordReader::next, however that is not that simple, as that the internal data members are all private.

I've just checked the source for NLineInputFormat and it hard codes LineReader, so overriding will not help.

Also, btw I'm using Hadoop 0.18 for compatibility with the Amazon EC2 MapReduce.

like image 365
monksy Avatar asked Apr 26 '10 04:04

monksy


3 Answers

You have to implement your own input format. You also have the possibility to define your own record reader then.

Unfortunately you have to define a getSplits()-method. In my opinion this will be harder than implementing the record reader: This method has to implement a logic to chunk the input data.

See the following excerpt from "Hadoop - The definitive guide" (a great book I would always recommend!):

Here’s the interface:

public interface InputFormat<K, V> {
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job, 
                                     Reporter reporter) throws IOException;
}

The JobClient calls the getSplits() method, passing the desired number of map tasks as the numSplits argument. This number is treated as a hint, as InputFormat imple- mentations are free to return a different number of splits to the number specified in numSplits. Having calculated the splits, the client sends them to the jobtracker, which uses their storage locations to schedule map tasks to process them on the tasktrackers.

On a tasktracker, the map task passes the split to the getRecordReader() method on InputFormat to obtain a RecordReader for that split. A RecordReader is little more than an iterator over records, and the map task uses one to generate record key-value pairs, which it passes to the map function. A code snippet (based on the code in MapRunner) illustrates the idea:

K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
  mapper.map(key, value, output, reporter);
} 
like image 81
Peter Wippermann Avatar answered Nov 03 '22 09:11

Peter Wippermann


I solved this problem recently by simply creating my own InputFormat that overrides NLineInputFormat and implements a custom MultiLineRecordReader instead of the default LineReader.

I chose to extend NLineInputFormat because I wanted to have the same guarantee of having exactly N line(s) per split.

This record reader is taken almost as is from http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/

The only things I modified is the property for maxLineLength that now uses the new API, and the value for NLINESTOPROCESS that gets read from NLineInputFormat's setNumLinesPerSplit() insead of being hardcoded (for more flexibility).

Here is the result:

public class MultiLineInputFormat extends NLineInputFormat{
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) {
        context.setStatus(genericSplit.toString());
        return new MultiLineRecordReader();
    }

    public static class MultiLineRecordReader extends RecordReader<LongWritable, Text>{
        private int NLINESTOPROCESS;
        private LineReader in;
        private LongWritable key;
        private Text value = new Text();
        private long start =0;
        private long end =0;
        private long pos =0;
        private int maxLineLength;

        @Override
        public void close() throws IOException {
            if (in != null) {
                in.close();
            }
        }

        @Override
        public LongWritable getCurrentKey() throws IOException,InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            if (start == end) {
                return 0.0f;
            }
            else {
                return Math.min(1.0f, (pos - start) / (float)(end - start));
            }
        }

        @Override
        public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException {
            NLINESTOPROCESS = getNumLinesPerSplit(context);
            FileSplit split = (FileSplit) genericSplit;
            final Path file = split.getPath();
            Configuration conf = context.getConfiguration();
            this.maxLineLength = conf.getInt("mapreduce.input.linerecordreader.line.maxlength",Integer.MAX_VALUE);
            FileSystem fs = file.getFileSystem(conf);
            start = split.getStart();
            end= start + split.getLength();
            boolean skipFirstLine = false;
            FSDataInputStream filein = fs.open(split.getPath());

            if (start != 0){
                skipFirstLine = true;
                --start;
                filein.seek(start);
            }
            in = new LineReader(filein,conf);
            if(skipFirstLine){
                start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
            }
            this.pos = start;
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (key == null) {
                key = new LongWritable();
            }
            key.set(pos);
            if (value == null) {
                value = new Text();
            }
            value.clear();
            final Text endline = new Text("\n");
            int newSize = 0;
            for(int i=0;i<NLINESTOPROCESS;i++){
                Text v = new Text();
                while (pos < end) {
                    newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
                    value.append(v.getBytes(),0, v.getLength());
                    value.append(endline.getBytes(),0, endline.getLength());
                    if (newSize == 0) {
                        break;
                    }
                    pos += newSize;
                    if (newSize < maxLineLength) {
                        break;
                    }
                }
            }
            if (newSize == 0) {
                key = null;
                value = null;
                return false;
            } else {
                return true;
            }
        }
    }

}
like image 31
Matt Fortier Avatar answered Nov 03 '22 09:11

Matt Fortier


I think that in your case you can follow the delegation pattern and implement a wrapper around LineRecordReader that overrides necessary methods i.e. next() (or nextKeyValue() in new API) to set value to a concatenation of N lines, rather than one line.

I have googled the exemplary implementation of ParagraphRecordReader that uses LineRecordReader to read input data line by line (and concatenate it) until encountering either EOF or a blank line. Then it returns pair, where value is a paragraph (instead of one line). Moreover, ParagraphInputFormat for this ParagraphRecordReader is as simple as standard TextInputFormat.

You can find the necessary links to this implementation and a couple of words about that the following post: http://hadoop-mapreduce.blogspot.com/2011/03/little-more-complicated-recordreaders.html.

Best

like image 30
kawaa Avatar answered Nov 03 '22 08:11

kawaa