Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

hadoop multiple already being created exception

I am not getting the below error, when I run the code on small data. But I am getting the below error while using multiple outputs, when I run the same code on a bigger dataset. Pls Help!

org.apache.hadoop.ipc.RemoteException: 
org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException: failed to create file
/home/users/mlakshm/alop176/data-r-00001 for
DFSClient_attempt_201208010142_0043_r_000001_1 on client 10.0.1.100, because this file
is already being created by DFSClient_attempt_201208010142_0043_r_000001_0 on     10.0.1.130 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:1406)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1246)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1188)
    at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:628)
    at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)

    at org.apache.hadoop.ipc.Client.call(Client.java:1070)
    at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
    at $Proxy2.create(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:616)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
    at $Proxy2.create(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.<init>(DFSClient.java:3248)
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:713)
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:182)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:555)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:455)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:118)
    at com.a.MultipleOutputs$InternalFileOutputFormat.getRecordWriter(MultipleOutputs.java:565)
    at com.a.MultipleOutputs.getRecordWriter(MultipleOutputs.java:432)
    at com.a.MultipleOutputs.getCollector(MultipleOutputs.java:518)
    at com.a.MultipleOutputs.getCollector(MultipleOutputs.java:482)
    at com.a.ReduceThree1.reduce(ReduceThree1.java:56)
    at com.a.ReduceThree1.reduce(ReduceThree1.java:1)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)

at org.apache.hadoop.mapred.Child.main(Child.java:249)


The reduce class is as follows:

public class ReduceThree1 extends MapReduceBase implements Reducer<Text, Text, Text, Text>{
        //  @SuppressWarnings("unchecked")
        private MultipleOutputs mos;

         public void configure(JobConf conf1) {

         mos = new MultipleOutputs(conf1);

         }

            public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {


                // MultipleOutputs mos;
                 int sum = 0;
                 ArrayList<CustomMapI> alcmap = new ArrayList<CustomMapI>();
                while(values.hasNext())
                {

                    String val = values.next().toString();
                    StringTokenizer st = new StringTokenizer(val);
                    String uid = st.nextToken();
                    String f_val = st.nextToken();
                    CustomMapI cmap = new CustomMapI(uid, f_val);
                    alcmap.add(cmap);
                    sum += Integer.parseInt(f_val);

                }

                StringTokenizer st = new StringTokenizer(key.toString());
                String t = st.nextToken();
                String data = st.nextToken();

                for(int i = 0; i<alcmap.size(); i++)
                {

                    String str_key = t+" "+alcmap.get(i).getUid();
                    String str_val = data+" "+alcmap.get(i).getF_val()+" "+sum;

                 //   output.collect(new Text(str_key), new Text(str_val));
                   mos.getCollector("/home/users/mlakshm/alop176/data", reporter).collect(new Text(str_key), new Text(str_val));

                   for(int j = 1; j<alcmap.size(); j++)
                   {
                       if((j>i)&&(!alcmap.get(i).equals(alcmap.get(j))))
                       {
                           String mul_key = "null";


                           String uidi = alcmap.get(i).getUid();
                           String uidj = alcmap.get(j).getUid();


                          ArrayList<String> alsort = new  ArrayList<String>();
                          alsort.add(uidi);
                          alsort.add(uidj);
                          Collections.sort(alsort);
                          int fi = Integer.parseInt(alcmap.get(i).getF_val());

                          int fj = Integer.parseInt(alcmap.get(j).getF_val());
                          String intersection = "null";
                          if(fi<fj)
                          {
                             intersection = String.valueOf(fi);
                          }
                          else
                          {
                              intersection = String.valueOf(fj);
                          }

                          String mul_val = t+" "+alsort.get(0)+" "+alsort.get(1)+" "+intersection;
                        //   System.out.println(mul_key+ " "+mul_val);

                           mos.getCollector("/home/users/mlakshm/alop177/datepairs", reporter).collect(new Text(mul_key), new Text(mul_val));
                       }
                   }

                }


             }

            public void close() throws IOException {
                 mos.close();

                 }
}

The Job Conf is as follows:

Configuration config1 = new Configuration();

          JobConf conf1 = new JobConf(config1, DJob.class);

          conf1.setJobName("DJob1");
          conf1.setOutputKeyClass(Text.class);
          conf1.setOutputValueClass(Text.class);
         // conf.setMapOutputValueClass(Text.class);
        //  conf.setMapOutputKeyClass(Text.class);
         // conf.setNumMapTasks(20);
          conf.setNumReduceTasks(10);
          conf1.setMapperClass(MapThree1.class);
         // conf.setCombinerClass(Combiner.class);
          conf1.setReducerClass(ReduceThree1.class);
          conf1.setPartitionerClass(CustomPartitioner.class);

          conf1.setInputFormat(TextInputFormat.class);
          conf1.setOutputFormat(TextOutputFormat.class);
       //   mos = new MultipleOutputs(conf1);
          MultipleOutputs.addNamedOutput(conf1, "/home/users/mlakshm/alop176/data", TextOutputFormat.class, LongWritable.class, Text.class);
          MultipleOutputs.addNamedOutput(conf1, "/home/users/mlakshm/alop177/datepairs", TextOutputFormat.class, LongWritable.class, Text.class);


          FileInputFormat.setInputPaths(conf1, new Path(other_args.get(2)));
          FileOutputFormat.setOutputPath(conf1, new Path(other_args.get(3)));

         JobClient.runJob(conf1);
like image 376
Mahalakshmi Lakshminarayanan Avatar asked Aug 02 '12 18:08

Mahalakshmi Lakshminarayanan


Video Answer


1 Answers

You most probably have speculative execution on, and two different attempts for reduce task 1 are trying to write to the path /home/users/mlakshm/alop176/data-r-00001. This probably succeeds for smaller tasks as they finish before hadoop speculatively executes a second attempt.

I see your implementation of MultipleOutputs is custom (com.a.MultipleOutputs), you should be writing all HDFS data to the tasks working directory and letting the OutputComitter move it to the final output directory upon output commit. If you're able to, pastebin the code and we can take a look.

like image 151
Chris White Avatar answered Sep 24 '22 10:09

Chris White