Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark 2.3 dynamic partitionBy not working on S3 AWS EMR 5.13.0

Dynamic partitioning introduced by Spark 2.3 doesn't seem to work on AWS's EMR 5.13.0 when writing to S3

When executing, a temporary directory is created in S3 but it disappears once the process is completed without writing the new data to the final folder structure.

The issue was found when executing a Scala/Spark 2.3 application on EMR 5.13.0.

The configuration is as follows:

var spark = SparkSession
  .builder
  .appName(MyClass.getClass.getSimpleName)
  .getOrCreate()

spark.conf.set("spark.sql.sources.partitionOverwriteMode","DYNAMIC") // also tried "dynamic"

The code that writes to S3:

val myDataset : Dataset[MyType] = ...

val w = myDataset
    .coalesce(10)
    .write
    .option("encoding", "UTF-8")
    .option("compression", "snappy")
    .mode("overwrite")
    .partitionBy("col_1","col_2")

w.parquet(s"$destinationPath/" + Constants.MyTypeTableName)

With destinationPath being a S3 bucket/folder

Anyone else has experienced this issue?

like image 957
David Costa Faidella Avatar asked May 10 '18 17:05

David Costa Faidella


People also ask

Does Spark support Amazon S3?

With Amazon EMR release version 5.17. 0 and later, you can use S3 Select with Spark on Amazon EMR.

What is Emrfs?

The EMR File System (EMRFS) is an implementation of HDFS that all Amazon EMR clusters use for reading and writing regular files from Amazon EMR directly to Amazon S3. EMRFS provides the convenience of storing persistent data in Amazon S3 for use with Hadoop while also providing features like data encryption.

What is dynamic partition in Spark?

In data analytics frameworks such as Spark it is important to detect and avoid scanning data that is irrelevant to the executed query, an optimization which is known as partition pruning. Dynamic partition pruning occurs when the optimizer is unable to identify at parse time the partitions it has to eliminate.


1 Answers

Upgrading to EMR 5.19 fixes the problem. However my previous answer is incorrect - using the EMRFS S3-optimized Committer has nothing to do with it. The EMRFS S3-optimized Committer is silently skipped when spark.sql.sources.partitionOverwriteMode is set to dynamic: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-committer-reqs.html

If you can upgrade to at least EMR 5.19.0, AWS's EMRFS S3-optimized Committer solves these issues.

--conf spark.sql.parquet.fs.optimized.committer.optimization-enabled=true

See: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html

like image 95
Robert Bart Avatar answered Oct 20 '22 08:10

Robert Bart