I'm new in Hadoop and tried to write relation join using Hadoop. The algorithm tries to join three relations in two consecutive rounds. I used recursive method. The program is working fine. But during execution it tries to print a warning like this:
14/12/02 10:41:16 WARN io.ReadaheadPool: Failed readahead on ifile
EBADF: Bad file descriptor
at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posix_fadvise(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posixFadviseIfPossible(NativeIO.java:263)
at org.apache.hadoop.io.nativeio.NativeIO$POSIX$CacheManipulator.posixFadviseIfPossible(NativeIO.java:142)
at org.apache.hadoop.io.ReadaheadPool$ReadaheadRequestImpl.run(ReadaheadPool.java:206)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
It is annoying and I want to know the cause of the problem and how to get rid of them. My code is as below:
public class Recursive {
/**
* Join three relations together using recursive method
* R JOIN S JOIN T = ((R JOIN S) JOIN T)
*/
static String[] relationSequence; // Keeps sequence of relations in join
static int round; // Round number running
/**
* Mapper
* Relation name = R
* Input tuple = a b
* Output pair = (b, (R,a))
* We assume that join value is the last attribute for the first relation
* and the first attribute for the second relation.
* So using this assumption, this map-reduce algorithm will work for any number of attributes
*/
public static class joinMapper extends Mapper<Object, Text, IntWritable, Text>{
public void map(Object keyIn, Text valueIn, Context context) throws IOException, InterruptedException {
// Read tuple and put attributes in a string array
String curValue = valueIn.toString();
String[] values = curValue.split("\t");
// Get relation name from input file name
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
// Get join attribute index number R join S
int joinIndex;
String others = "";
if(fileName.compareTo(relationSequence[round])==0){
joinIndex = 0;
others = curValue.substring(0+2);
}else{
joinIndex = values.length - 1;
others = curValue.substring(0, curValue.length()-2);
}
IntWritable joinValue = new IntWritable(Integer.parseInt(values[joinIndex]));
// Create list of attributes which are not join attribute
Text temp = new Text(fileName + "|" + others);
context.write(joinValue,temp);
}
}
/**
* Reducer
*
* 1. Divide the input list in two ArrayLists based on relation name:
* a. first relation
* b. second relation
* 2. Test if the second relation is not empty. If it's so, we shouldn't continue.
* 3. For each element of the first array list, join it with the all elements in
* the second array list
*/
public static class joinReducer extends Reducer<IntWritable, Text, Text, Text>{
public void reduce(IntWritable keyIn, Iterable<Text> valueIn, Context context)
throws IOException, InterruptedException{
ArrayList<String> firstRelation = new ArrayList<String>();
ArrayList<String> secondRelation = new ArrayList<String>();
for (Text value : valueIn) {
String[] values = value.toString().split("\\|");
if(values[0].compareTo(relationSequence[round])==0){
secondRelation.add(values[1]);
}else{
firstRelation.add(values[1]);
}
}
if(secondRelation.size()>0){
for (String firstItem : firstRelation) {
for (String secondItem : secondRelation) {
context.write(new Text(firstItem.toString()), new Text(keyIn.toString() + "\t"
+ secondItem.toString()
));
}
}
}
}
}
/**
* Partitioner
*
* In order to hash pairs to reducer tasks, we used logical which is
* obviously faster than module function.
*/
public static class joinPartitioner extends Partitioner<IntWritable, Text> {
public int getPartition(IntWritable key, Text value, int numReduceTasks) {
int partitionNumber = key.get()&0x007F;
return partitionNumber;
}
}
/**
* Main method
*
* (R join S join T)
* hadoop jar ~/COMP6521.jar Recursive /input/R /input/S /input2/T /output R,S,T
*
* @param args
* <br> args[0]: first relation
* <br> args[1]: second relation
* <br> args[2]: third relation
* <br> args[3]: output directory
* <br> args[4]: relation sequence to join, separated by comma
*/
public static void main(String[] args) throws IllegalArgumentException, IOException, InterruptedException, ClassNotFoundException {
long s = System.currentTimeMillis();
/****** Preparing problem variables *******/
relationSequence = args[4].split(","); // Keep sequence of relations
round = 1; // Variable to keep current round number
int maxOfReducers = 128; // Maximum number of available reducers
int noReducers; // Number of reducers for one particular job
noReducers = maxOfReducers;
Path firstRelation = new Path(args[0]);
Path secondRelation = new Path(args[1]);
Path thirdRelation = new Path(args[2]);
Path temp = new Path("/temp"); // Temporary path to keep intermediate result
Path out = new Path(args[3]);
/****** End of variable Preparing *******/
Configuration conf = new Configuration();
/****** Configuring first job *******/
// General configuration
Job job = Job.getInstance(conf, "Recursive multi-way join (first round)");
job.setNumReduceTasks(noReducers);
// Pass appropriate classes
job.setJarByClass(Recursive.class);
job.setMapperClass(joinMapper.class);
job.setPartitionerClass(joinPartitioner.class);
job.setReducerClass(joinReducer.class);
// Specify input and output type of reducers
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(temp)){ fs.delete(temp, true);}
if(fs.exists(out)) { fs.delete(out, true); }
// Specify the input and output paths
FileInputFormat.addInputPath(job, firstRelation);
FileInputFormat.addInputPath(job, secondRelation);
FileOutputFormat.setOutputPath(job, temp);
/****** End of first job configuration *******/
job.submit();
// Running the first job
boolean b = job.waitForCompletion(true);
if(b){
// try to execute the second job after completion of the first one
round++; // Specify round number
Configuration conf2 = new Configuration(); // Create new configuration object
/****** Configuring second job *******/
// General configuration
Job job2 = Job.getInstance(conf2, "Reduce multi-way join (second round)");
job2.setNumReduceTasks(noReducers);
// Pass appropriate classes
job2.setJarByClass(Recursive.class);
job2.setMapperClass(joinMapper.class);
job2.setPartitionerClass(joinPartitioner.class);
job2.setReducerClass(joinReducer.class);
// Specify input and output type of reducers
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
// Specify input and output type of mappers
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
// End of 2014-11-25
// Specify the input and output paths
FileInputFormat.addInputPath(job2, temp);
FileInputFormat.addInputPath(job2, thirdRelation);
FileOutputFormat.setOutputPath(job2, out);
/****** End of second job configuration *******/
job2.submit();
// Running the first job
b = job2.waitForCompletion(true);
// Output time measurement
long e = System.currentTimeMillis() - s;
System.out.println("Total: " + e);
System.exit(b ? 0 : 1);
}
System.exit(1);
}
}
I had a similar error and I end up on your question, and this mail list thread EBADF: Bad file descriptor
To clarify a little bit, the readahead pool can sometimes spit out this message if you close a file while a readahead request is in flight. It's not an error and just reflects the fact that the file was closed hastily, probably because of some other bug which is the real problem.
In my case I was closing a writer without flushing it with hflush
Since you don't seem to use by hand a writer or a reader, I would probably have a look to how you are sending the mr task.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With