Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hadoop multiple outputs with speculative execution

I have a task which writes avro output in multiple directories organized by few fields of the input records.

For example : 
Process records of countries across years 
and write in a directory structure of country/year 
eg:
outputs/usa/2015/outputs_usa_2015.avro 
outputs/uk/2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
     multipleOutputs.write("output", avroKey, NullWritable.get(), 
            OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());

What output commiter would the below code use to write the output.Is it not safe to be used with speculative execution? With speculative execution this causes(may cause) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException

In this post Hadoop Reducer: How can I output to multiple directories using speculative execution? It is suggested to use a custom output committer

The below code from hadoop AvroMultipleOutputs does not state any problem with speculative execution

 private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
          String baseFileName) throws IOException, InterruptedException {

    writer =
                ((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
                    taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}

Neither does the write method document any issues if baseoutput path is outside the job directory

public void write(String namedOutput, Object key, Object value, String baseOutputPath)

Is there a real issue with AvroMultipleOutputs (an other outputs) with speculative execution when writing outside the job directory? If,then how do i override AvroMultipleOutputs to have it's own output committer.I can't see any outputformat inside AvroMultipleOutputs whose output committer it uses

like image 920
bl3e Avatar asked May 05 '15 19:05

bl3e


1 Answers

AvroMultipleOutputs will use the OutputFormat which you have registered to Job configurations while adding named output e.g using addNamedOutput API from AvroMultipleOutputs (e.g. AvroKeyValueOutputFormat).

With AvroMultipleOutputs, you might not be able to use speculative task execution feature. Even overriding it either would not help or would not be simple.

Instead you should write your own OutputFormat (most probably extending one of the available Avro output formats e.g. AvroKeyValueOutputFormat), and override/implement its getRecordWriter API, where it would return one RecordWriter instance say MainRecordWriter (just for reference).

This MainRecordWriterwould maintain a map of RecordWriter (e.g. AvroKeyValueRecordWriter) instances. Each of these RecordWriter instances would belong to one of the output file. In write API of MainRecordWriter, you would get the actual RecordWriter instance from the map (based on the record you are going to write), and write the record using this record writer. So MainRecordWriter would be just working as a wrapper over multiple RecordWriter instances.

For some similar implementation, you might like to study the code of MultiStorage class from piggybank library.

like image 107
Vasu Avatar answered Sep 30 '22 03:09

Vasu