I'm trying to set spark.sql.parquet.output.committer.class and nothing I do seems to get the setting to take effect.
I'm trying to have many threads write to the same output folder, which would work with org.apache.spark.sql.
parquet.DirectParquetOutputCommitter
since it wouldn't use the _temporary
folder. I'm getting the following error, which is how I know it's not working:
Caused by: java.io.FileNotFoundException: File hdfs://path/to/stuff/_temporary/0/task_201606281757_0048_m_000029/some_dir does not exist.
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:795)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:106)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:853)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:849)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:849)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:382)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326)
at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
Note the call to org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob
, the default class.
I've tried the following, based on other SO answers and searches:
sc._jsc.hadoopConfiguration().set(key, val)
(this does work for settings like parquet.enable.summary-metadata
)dataframe.write.option(key, val).parquet
--conf "spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter"
to the spark-submit
call--conf "spark.sql.parquet.output.committer.class"=" org.apache.spark.sql.parquet.DirectParquetOutputCommitter"
to the spark-submit
call.That's all I've been able to find, and nothing works. It looks like it's not hard to set in Scala but appears impossible in Python.
The approach in this comment definitively worked for me:
16/06/28 18:49:59 INFO ParquetRelation: Using user defined output committer for Parquet: org.apache.spark.sql.execution.datasources.parquet.DirectParquetOutputCommitter
It was a lost log message in the flood that Spark gives, and the error I was seeing was unrelated. It's all moot anyway, since the DirectParquetOutputCommitter has been removed from Spark.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With