Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

hadoop warn EBADF: Bad file descriptor

I'm new in Hadoop and tried to write relation join using Hadoop. The algorithm tries to join three relations in two consecutive rounds. I used recursive method. The program is working fine. But during execution it tries to print a warning like this:

14/12/02 10:41:16 WARN io.ReadaheadPool: Failed readahead on ifile                                                                                                                  
EBADF: Bad file descriptor                                                                                                                                                          
        at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posix_fadvise(Native Method)                                                                                                
        at org.apache.hadoop.io.nativeio.NativeIO$POSIX.posixFadviseIfPossible(NativeIO.java:263)                                                                                   
        at org.apache.hadoop.io.nativeio.NativeIO$POSIX$CacheManipulator.posixFadviseIfPossible(NativeIO.java:142)                                                                  
        at org.apache.hadoop.io.ReadaheadPool$ReadaheadRequestImpl.run(ReadaheadPool.java:206)                                                                                      
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)                                                                                          
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)                                                                                          
        at java.lang.Thread.run(Thread.java:745)

It is annoying and I want to know the cause of the problem and how to get rid of them. My code is as below:

public class Recursive {  
    /**
     * Join three relations together using recursive method
     * R JOIN S JOIN T = ((R JOIN S) JOIN T)
     */
    static String[] relationSequence;           // Keeps sequence of relations in join
    static int round;                           // Round number running
    /**
     * Mapper
     * Relation name = R
     * Input tuple   = a    b
     * Output pair   = (b, (R,a))
     * We assume that join value is the last attribute for the first relation
     * and the first attribute for the second relation.
     * So using this assumption, this map-reduce algorithm will work for any number of attributes  
     */
    public static class joinMapper extends Mapper<Object, Text, IntWritable, Text>{
        public void map(Object keyIn, Text valueIn, Context context) throws IOException, InterruptedException {
            // Read tuple and put attributes in a string array
            String curValue = valueIn.toString();
            String[] values = curValue.split("\t");
            // Get relation name from input file name
            String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
            // Get join attribute index number R join S
            int joinIndex;
            String others = "";
            if(fileName.compareTo(relationSequence[round])==0){
                joinIndex = 0;
                others = curValue.substring(0+2);
            }else{
                joinIndex = values.length - 1;
                others = curValue.substring(0, curValue.length()-2);
            }
            IntWritable joinValue = new IntWritable(Integer.parseInt(values[joinIndex]));

            // Create list of attributes which are not join attribute
            Text temp = new Text(fileName + "|" + others);
            context.write(joinValue,temp);
        }
    }

    /**
     * Reducer
     * 
     *  1. Divide the input list in two ArrayLists based on relation name:
     *      a. first relation
     *      b. second relation
     *  2. Test if the second relation is not empty. If it's so, we shouldn't continue.
     *  3. For each element of the first array list, join it with the all elements in
     *     the second array list
     */
    public static class joinReducer extends Reducer<IntWritable, Text, Text, Text>{
        public void reduce(IntWritable keyIn, Iterable<Text> valueIn, Context context)
                throws IOException, InterruptedException{
            ArrayList<String> firstRelation = new ArrayList<String>();
            ArrayList<String> secondRelation = new ArrayList<String>();
            for (Text value : valueIn) {
                String[] values = value.toString().split("\\|");
                if(values[0].compareTo(relationSequence[round])==0){
                    secondRelation.add(values[1]);
                }else{
                    firstRelation.add(values[1]);
                }
            }
            if(secondRelation.size()>0){
                for (String firstItem : firstRelation) {
                    for (String secondItem : secondRelation) {
                        context.write(new Text(firstItem.toString()), new Text(keyIn.toString() + "\t"
                                                                            + secondItem.toString() 
                                                                            ));
                    }
                }
            }
        }

    }

    /**
     * Partitioner
     * 
     * In order to hash pairs to reducer tasks, we used logical which is 
     * obviously faster than module function.
     */
    public static class joinPartitioner extends Partitioner<IntWritable, Text> {
        public int getPartition(IntWritable key, Text value, int numReduceTasks) {
                int partitionNumber = key.get()&0x007F;
                return partitionNumber;
            }
     }

     /**
      * Main method
      * 
      * (R join S join T)
      * hadoop jar ~/COMP6521.jar Recursive /input/R /input/S /input2/T /output R,S,T
      * 
      * @param args
      * <br> args[0]: first relation
      * <br> args[1]: second relation
      * <br> args[2]: third relation
      * <br> args[3]: output directory
      * <br> args[4]: relation sequence to join, separated by comma
      */
    public static void main(String[] args) throws IllegalArgumentException, IOException, InterruptedException, ClassNotFoundException {
        long s = System.currentTimeMillis();
        /****** Preparing problem variables *******/
        relationSequence = args[4].split(",");      // Keep sequence of relations
        round = 1;                                  // Variable to keep current round number
        int maxOfReducers = 128;                    // Maximum number of available reducers
        int noReducers;                             // Number of reducers for one particular job
        noReducers = maxOfReducers;

        Path firstRelation  = new Path(args[0]);
        Path secondRelation = new Path(args[1]);
        Path thirdRelation  = new Path(args[2]);
        Path temp           = new Path("/temp");    // Temporary path to keep intermediate result
        Path out            = new Path(args[3]);
        /****** End of variable Preparing   *******/

        Configuration conf = new Configuration();

        /****** Configuring first job *******/
//      General configuration
        Job job = Job.getInstance(conf, "Recursive multi-way join (first round)");
        job.setNumReduceTasks(noReducers);

//      Pass appropriate classes
        job.setJarByClass(Recursive.class);
        job.setMapperClass(joinMapper.class);
        job.setPartitionerClass(joinPartitioner.class);
        job.setReducerClass(joinReducer.class);

//      Specify input and output type of reducers
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(temp)){ fs.delete(temp, true);}
        if(fs.exists(out)) { fs.delete(out, true); }

//      Specify the input and output paths  
        FileInputFormat.addInputPath(job, firstRelation);
        FileInputFormat.addInputPath(job, secondRelation);
        FileOutputFormat.setOutputPath(job, temp);
        /****** End of first job configuration *******/
        job.submit();
//      Running the first job
        boolean b = job.waitForCompletion(true);
        if(b){
//          try to execute the second job after completion of the first one
            round++;                    // Specify round number
            Configuration conf2 = new Configuration();  // Create new configuration object

            /****** Configuring second job *******/
//          General configuration
            Job job2 = Job.getInstance(conf2, "Reduce multi-way join (second round)");
            job2.setNumReduceTasks(noReducers);

//          Pass appropriate classes
            job2.setJarByClass(Recursive.class);
            job2.setMapperClass(joinMapper.class);
            job2.setPartitionerClass(joinPartitioner.class);
            job2.setReducerClass(joinReducer.class);

//          Specify input and output type of reducers
            job2.setOutputKeyClass(Text.class);
            job2.setOutputValueClass(Text.class);

//          Specify input and output type of mappers
            job2.setMapOutputKeyClass(IntWritable.class);
            job2.setMapOutputValueClass(Text.class);
//          End of 2014-11-25
//          Specify the input and output paths  
            FileInputFormat.addInputPath(job2, temp);
            FileInputFormat.addInputPath(job2, thirdRelation);
            FileOutputFormat.setOutputPath(job2, out);
            /****** End of second job configuration *******/
            job2.submit();
//          Running the first job
            b = job2.waitForCompletion(true);

//          Output time measurement
            long e = System.currentTimeMillis() - s;
            System.out.println("Total: " + e);
            System.exit(b ? 0 : 1);
        }
        System.exit(1);
    }

}
like image 547
Iraj Hedayati Avatar asked Dec 02 '14 15:12

Iraj Hedayati


1 Answers

I had a similar error and I end up on your question, and this mail list thread EBADF: Bad file descriptor

To clarify a little bit, the readahead pool can sometimes spit out this message if you close a file while a readahead request is in flight. It's not an error and just reflects the fact that the file was closed hastily, probably because of some other bug which is the real problem.

In my case I was closing a writer without flushing it with hflush

Since you don't seem to use by hand a writer or a reader, I would probably have a look to how you are sending the mr task.

like image 188
AdrieanKhisbe Avatar answered Sep 19 '22 22:09

AdrieanKhisbe