Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Slow Parquet write to HDFS using Spark

I am using Spark 1.6.1 and writing to HDFS. In some cases it seems like all the work is being done by one thread. Why is that?

Also, I need parquet.enable.summary-metadata to register the parquet files to Impala.

Df.write().partitionBy("COLUMN").parquet(outputFileLocation);

It also, seems like all of this happens in one cpu of a executor.

16/11/03 14:59:20 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/11/03 14:59:20 INFO mapred.SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_201611031459_0154_m_000029_0
16/11/03 15:17:56 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 41.9 GB to disk (3  times so far)
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/11/03 15:21:05 INFO codec.CodecConfig: Compression: GZIP
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Dictionary is on
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Validation is off
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0
16/11/03 15:21:05 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:

Then again :-

16/11/03 15:21:05 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Maximum partitions reached, falling back on sorting.
16/11/03 15:32:37 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (0  time so far)
16/11/03 15:45:47 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (1  time so far)
16/11/03 15:48:44 INFO datasources.DynamicPartitionWriterContainer: Sorting complete. Writing out partition files one at a time.
16/11/03 15:48:44 INFO codec.CodecConfig: Compression: GZIP
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Dictionary is on
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Validation is off
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0
16/11/03 15:48:44 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:

The Schema

About 200 of the following lines again and again 20 times or so.

16/11/03 15:48:44 INFO compress.CodecPool: Got brand-new compressor [.gz]
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: mem size 135,903,551 > 134,217,728: flushing 1,040,100 records to disk.
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 89,688,651

About 200 of the following lines

16/11/03 15:49:51 INFO hadoop.ColumnChunkPageWriteStore: written 413,231B for [a17bbfb1_2808_11e6_a4e6_77b5e8f92a4f] BINARY: 1,040,100 values, 1,138,534B raw, 412,919B comp, 8 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY], dic { 356 entries, 2,848B raw, 356B comp}

Then at last:-

16/11/03 16:15:41 INFO output.FileOutputCommitter: Saved output of task 'attempt_201611031521_0154_m_000040_0' to hdfs://PATH/_temporary/0/task_201611031521_0154_m_000040
16/11/03 16:15:41 INFO mapred.SparkHadoopMapRedUtil: attempt_201611031521_0154_m_000040_0: Committed
16/11/03 16:15:41 INFO executor.Executor: Finished task 40.0 in stage 154.0 (TID 8545). 3757 bytes result sent to driver

Update: parquet.enable.summary-metadata set to false.
Reduced partitions to 21.

Df.write().mode(SaveMode.Append).partitionBy("COL").parquet(outputFileLocation);

It did improve speed but still takes an hour to complete.

Update :- The reason for most of the issue is multiple left outer join with very small data being materialized just before write. The spills are happening because of Append mode which keeps file open. The is default limit of 5 open files in this mode. You can increase this using property "spark.sql.sources.maxConcurrentWrites"

like image 966
morfious902002 Avatar asked Nov 03 '16 18:11

morfious902002


People also ask

Why does spark work better with Parquet?

Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.

Why Parquet is best for spark and not Orc?

One key difference between the two is that ORC is better optimized for Hive, whereas Parquet works really well with Apache Spark. In fact, Parquet is the default file format for writing and reading data in Apache Spark.

Does spark support Parquet?

Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons.


1 Answers

Finally after some optimizations in the code before reaching the write part we got better write times. Before we could not do repartition as the shuffles were more than 4-5 Gb. After previous changes, I changed the code from coalesce to repartition which distributed the data across all executors there by giving each CPU in executors about the same amount of data to write. So, if you see that the parquet files created by your jobs vary in size than try to repartition your Dataframe before write.

Also, this can help with write performance too :-

sc.hadoopConfiguration.set("parquet.enable.dictionary", "false")
like image 159
morfious902002 Avatar answered Oct 03 '22 13:10

morfious902002