I am overriding method "next" of RecordReader class and "getRecordReader" of TextInputFormat class in order to send a whole paragraph to the mapper instead of line by line. (I am using old api's and defination for my paragraph is append till the time a blank line comes in my text file.)
Below is my code:
public class NLinesInputFormat extends TextInputFormat
{
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter)throws IOException {
reporter.setStatus(split.toString());
return new ParagraphRecordReader(conf, (FileSplit)split);
}
}
public class ParagraphRecordReader implements RecordReader<LongWritable, Text>
{
private LineRecordReader lineRecord;
private LongWritable lineKey;
private Text lineValue;
public ParagraphRecordReader(JobConf conf, FileSplit split) throws IOException {
lineRecord = new LineRecordReader(conf, split);
lineKey = lineRecord.createKey();
lineValue = lineRecord.createValue();
}
@Override
public void close() throws IOException {
lineRecord.close();
}
@Override
public LongWritable createKey() {
return new LongWritable();
}
@Override
public Text createValue() {
return new Text("");
}
@Override
public float getProgress() throws IOException {
return lineRecord.getPos();
}
@Override
public synchronized boolean next(LongWritable key, Text value) throws IOException {
boolean appended, gotsomething;
boolean retval;
byte space[] = {' '};
value.clear();
gotsomething = false;
do {
appended = false;
retval = lineRecord.next(lineKey, lineValue);
if (retval) {
if (lineValue.toString().length() > 0) {
byte[] rawline = lineValue.getBytes();
int rawlinelen = lineValue.getLength();
value.append(rawline, 0, rawlinelen);
value.append(space, 0, 1);
appended = true;
}
gotsomething = true;
}
} while (appended);
//System.out.println("ParagraphRecordReader::next() returns "+gotsomething+" after setting value to: ["+value.toString()+"]");
return gotsomething;
}
@Override
public long getPos() throws IOException {
return lineRecord.getPos();
}
}
Questions:
1. I did not find any concrete guide on how to do this, so may be there is something I am doing wrong please comment any suggestion?
2. I am able to compile this correctly but when I run my job my mapper is continuously running and I am not able to figure it out where is the problem?
Your code works perfectly fine for me. The only change I did was to have these classes as inner class and made them static.
Input file was as follows:
This is awesome.
WTF is this.
This is just a test.
The mapper code looked like:
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
System.out.println(key+" : "+value);
}
And the output was:
0 : This is awesome. WTF is this.
0 : This is just a test.
I am sure you would haven't forgotten to set the input format, but just in case, set it as follows:
conf.setInputFormat(NLinesInputFormat.class);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With