Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Set spark.sql.parquet.output.committer.class in pyspark

I'm trying to set spark.sql.parquet.output.committer.class and nothing I do seems to get the setting to take effect.

I'm trying to have many threads write to the same output folder, which would work with org.apache.spark.sql. parquet.DirectParquetOutputCommitter since it wouldn't use the _temporary folder. I'm getting the following error, which is how I know it's not working:

Caused by: java.io.FileNotFoundException: File hdfs://path/to/stuff/_temporary/0/task_201606281757_0048_m_000029/some_dir does not exist.
        at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:795)
        at org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:106)
        at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:853)
        at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:849)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:849)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:382)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326)
        at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
        at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)

Note the call to org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob, the default class.

I've tried the following, based on other SO answers and searches:

  1. sc._jsc.hadoopConfiguration().set(key, val) (this does work for settings like parquet.enable.summary-metadata)
  2. dataframe.write.option(key, val).parquet
  3. Adding --conf "spark.hadoop.spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter" to the spark-submit call
  4. Adding --conf "spark.sql.parquet.output.committer.class"=" org.apache.spark.sql.parquet.DirectParquetOutputCommitter" to the spark-submit call.

That's all I've been able to find, and nothing works. It looks like it's not hard to set in Scala but appears impossible in Python.

like image 341
KFB Avatar asked Jun 28 '16 18:06

KFB


1 Answers

The approach in this comment definitively worked for me:

16/06/28 18:49:59 INFO ParquetRelation: Using user defined output committer for Parquet: org.apache.spark.sql.execution.datasources.parquet.DirectParquetOutputCommitter

It was a lost log message in the flood that Spark gives, and the error I was seeing was unrelated. It's all moot anyway, since the DirectParquetOutputCommitter has been removed from Spark.

like image 132
KFB Avatar answered Sep 28 '22 09:09

KFB