Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark parquet write gets slow as partitions grow

I have a spark streaming application that writes parquet data from stream.

sqlContext.sql(
      """
        |select
        |to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
        |hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
        |*
        |from events
        | where at >= 1473667200
      """.stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)

this piece of code runs every hour but over time the writing to parquet has slowed down. When we started it took 15 mins to write data, now it takes 40 mins. It is taking time propotional to data existing in that path. I tried running the same application to a new location and that runs fast.

I have disabled schemaMerge and summary metadata:

sparkConf.set("spark.sql.hive.convertMetastoreParquet.mergeSchema","false")
sparkConf.set("parquet.enable.summary-metadata","false")

using spark 2.0

batch execution: empty directory enter image description here enter image description here enter image description here directory with 350 folders enter image description here enter image description here enter image description here

like image 692
Gaurav Shah Avatar asked Sep 16 '16 06:09

Gaurav Shah


People also ask

Can Parquet file be partitioned?

An ORC or Parquet file contains data columns. To these files you can add partition columns at write time. The data files do not store values for partition columns; instead, when writing the files you divide them into groups (partitions) based on column values.

Is it better to have one large Parquet file or lots of smaller Parquet files?

Also, parquet file size and for that matter all files generally should be greater in size than the HDFS block size (default 128MB). we are using coalesce function with hive context with 50 executors for one of our file which is ~15GB and it runs like a charm.

What is a good number of partitions in Spark?

The general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, and for upper bound — the task should take 100ms+ time to execute.


Video Answer


1 Answers

I've encountered this issue. The append mode is probably the culprit, in that finding the append location takes more and more time as the size of your parquet file grows.

One workaround I've found that solves this is to change the output path regularly. Merging and reordering the data from all the output dataframes is then usually not an issue.

def appendix: String = ((time.milliseconds - timeOrigin) / (3600 * 1000)).toString

df.write.mode(SaveMode.Append).format("parquet").save(s"${outputPath}-H$appendix")
like image 93
Francois G Avatar answered Sep 19 '22 15:09

Francois G