I have a task which writes avro output in multiple directories organized by few fields of the input records.
For example : Process records of countries across years and write in a directory structure of country/year eg: outputs/usa/2015/outputs_usa_2015.avro outputs/uk/2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
multipleOutputs.write("output", avroKey, NullWritable.get(),
OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());
What output commiter would the below code use to write the output.Is it not safe to be used with speculative execution? With speculative execution this causes(may cause) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException
In this post Hadoop Reducer: How can I output to multiple directories using speculative execution? It is suggested to use a custom output committer
The below code from hadoop AvroMultipleOutputs does not state any problem with speculative execution
private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
String baseFileName) throws IOException, InterruptedException {
writer =
((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}
Neither does the write method document any issues if baseoutput path is outside the job directory
public void write(String namedOutput, Object key, Object value, String baseOutputPath)
Is there a real issue with AvroMultipleOutputs (an other outputs) with speculative execution when writing outside the job directory? If,then how do i override AvroMultipleOutputs to have it's own output committer.I can't see any outputformat inside AvroMultipleOutputs whose output committer it uses
AvroMultipleOutputs will use the OutputFormat which you have registered to Job configurations while adding named output e.g using addNamedOutput API from AvroMultipleOutputs (e.g. AvroKeyValueOutputFormat).
With AvroMultipleOutputs, you might not be able to use speculative task execution feature. Even overriding it either would not help or would not be simple.
Instead you should write your own OutputFormat (most probably extending one of the available Avro output formats e.g. AvroKeyValueOutputFormat), and override/implement its getRecordWriter API, where it would return one RecordWriter instance say MainRecordWriter (just for reference).
This MainRecordWriterwould maintain a map of RecordWriter (e.g. AvroKeyValueRecordWriter) instances. Each of these RecordWriter instances would belong to one of the output file. In write API of MainRecordWriter, you would get the actual RecordWriter instance from the map (based on the record you are going to write), and write the record using this record writer. So MainRecordWriter would be just working as a wrapper over multiple RecordWriter instances.
For some similar implementation, you might like to study the code of MultiStorage class from piggybank library.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With