I have a task which writes avro output in multiple directories organized by few fields of the input records.
For example : Process records of countries across years and write in a directory structure of country/year eg: outputs/usa/2015/outputs_usa_2015.avro outputs/uk/2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
multipleOutputs.write("output", avroKey, NullWritable.get(),
OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());
What output commiter would the below code use to write the output.Is it not safe to be used with speculative execution? With speculative execution this causes(may cause) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException
In this post Hadoop Reducer: How can I output to multiple directories using speculative execution? It is suggested to use a custom output committer
The below code from hadoop AvroMultipleOutputs does not state any problem with speculative execution
private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
String baseFileName) throws IOException, InterruptedException {
writer =
((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}
Neither does the write method document any issues if baseoutput path is outside the job directory
public void write(String namedOutput, Object key, Object value, String baseOutputPath)
Is there a real issue with AvroMultipleOutputs (an other outputs) with speculative execution when writing outside the job directory? If,then how do i override AvroMultipleOutputs to have it's own output committer.I can't see any outputformat inside AvroMultipleOutputs whose output committer it uses
AvroMultipleOutputs
will use the OutputFormat
which you have registered to Job configurations while adding named output e.g using addNamedOutput
API from AvroMultipleOutputs
(e.g. AvroKeyValueOutputFormat
).
With AvroMultipleOutputs
, you might not be able to use speculative task execution feature. Even overriding it either would not help or would not be simple.
Instead you should write your own OutputFormat
(most probably extending one of the available Avro output formats e.g. AvroKeyValueOutputFormat
), and override/implement its getRecordWriter
API, where it would return one RecordWriter
instance say MainRecordWriter
(just for reference).
This MainRecordWriter
would maintain a map of RecordWriter
(e.g. AvroKeyValueRecordWriter
) instances. Each of these RecordWriter
instances would belong to one of the output file. In write
API of MainRecordWriter
, you would get the actual RecordWriter
instance from the map (based on the record you are going to write), and write the record using this record writer. So MainRecordWriter
would be just working as a wrapper over multiple RecordWriter
instances.
For some similar implementation, you might like to study the code of MultiStorage class from piggybank
library.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With