Can someone please point out where I could find an implementation for CombineFileInputFormat
(org. using Hadoop 0.20.205? this is to create large splits from very small log files (text in lines) using EMR.
It is surprising that Hadoop does not have a default implementation for this class made specifically for this purpose and googling it looks like I'm not the only one confused by this. I need to compile the class and bundle it in a jar for hadoop-streaming, with a limited knowledge of Java this is some challenge.
Edit: I already tried the yetitrails example, with the necessary imports but I get a compiler error for the next method.
Here is an implementation I have for you:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
@SuppressWarnings("deprecation")
public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {
return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
}
public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
private final LineRecordReader linerecord;
public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
linerecord = new LineRecordReader(conf, filesplit);
}
@Override
public void close() throws IOException {
linerecord.close();
}
@Override
public LongWritable createKey() {
// TODO Auto-generated method stub
return linerecord.createKey();
}
@Override
public Text createValue() {
// TODO Auto-generated method stub
return linerecord.createValue();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return linerecord.getPos();
}
@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return linerecord.getProgress();
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
// TODO Auto-generated method stub
return linerecord.next(key, value);
}
}
}
In your job first set the parameter mapred.max.split.size
according to the size you would like the input files to be combined into. Do something like follows in your run():
...
if (argument != null) {
conf.set("mapred.max.split.size", argument);
} else {
conf.set("mapred.max.split.size", "134217728"); // 128 MB
}
...
conf.setInputFormat(CombinedInputFormat.class);
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With