Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark + Parquet + Snappy: Overall compression ratio loses after spark shuffles data

Commmunity!

Please help me understand how to get better compression ratio with Spark?

Let me describe case:

  1. I have dataset, let's call it product on HDFS which was imported using Sqoop ImportTool as-parquet-file using codec snappy. As result of import, I have 100 files with total 46 GB du, files with diffrrent size (min 11MB, max 1.5GB, avg ~ 500MB). Total count of records a little bit more than 8 billions with 84 columns

  2. I'm doing simple read/repartition/write with Spark using snappy as well and as result I'm getting:

~100 GB output size with the same files count, same codec, same count and same columns.

Code snippet:

val productDF = spark.read.parquet("/ingest/product/20180202/22-43/")

productDF
.repartition(100)
.write.mode(org.apache.spark.sql.SaveMode.Overwrite)
.option("compression", "snappy")
.parquet("/processed/product/20180215/04-37/read_repartition_write/general")
  1. Using parquet-tools I have looked into random files from both ingest and processed and they looks as below:

ingest:

creator:                        parquet-mr version 1.5.0-cdh5.11.1 (build ${buildNumber}) 
extra:                          parquet.avro.schema = {"type":"record","name":"AutoGeneratedSchema","doc":"Sqoop import of QueryResult","fields"

and almost all columns looks like
AVAILABLE: OPTIONAL INT64 R:0 D:1

row group 1:                    RC:3640100 TS:36454739 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY DO:0 FPO:172743 SZ:370515/466690/1.26 VC:3640100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 126518400000, max: 1577692800000, num_nulls: 2541633]

processed:

creator:                        parquet-mr version 1.5.0-cdh5.12.0 (build ${buildNumber}) 
extra:                          org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields"

AVAILABLE:                      OPTIONAL INT64 R:0 D:1
...

row group 1:                    RC:6660100 TS:243047789 OFFSET:4 

AVAILABLE:                       INT64 SNAPPY DO:0 FPO:4122795 SZ:4283114/4690840/1.10 VC:6660100 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE ST:[min: -2209136400000, max: 10413820800000, num_nulls: 4444993]

In other hand, without repartition or using coalesce - size remains close to ingest data size.

  1. Going forward, I did following:

    • read dataset and write it back with

      productDF
        .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
        .option("compression", "none")
        .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithoutshuffle")
      
    • read dataset, repartition and write it back with

      productDF
        .repartition(500)
        .write.mode(org.apache.spark.sql.SaveMode.Overwrite)
        .option("compression", "none")
        .parquet("/processed/product/20180215/04-37/read_repartition_write/nonewithshuffle")
      

As result: 80 GB without and 283 GB with repartition with same # of output files

80GB parquet meta example:

AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:456753 SZ:1452623/1452623/1.00 VC:11000100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -1735747200000, max: 2524550400000, num_nulls: 7929352]

283 GB parquet meta example:

AVAILABLE:                       INT64 UNCOMPRESSED DO:0 FPO:2800387 SZ:2593838/2593838/1.00 VC:3510100 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: -2209136400000, max: 10413820800000, num_nulls: 2244255]

It seems, that parquet itself (with encoding?) much reduce size of data even without uncompressed data. How ? :)

I tried to read uncompressed 80GB, repartition and write back - I've got my 283 GB

  • The first question for me is why I'm getting bigger size after spark repartitioning/shuffle?

  • The second is how to efficiently shuffle data in spark to benefit parquet encoding/compression if there any?

In general, I don't want that my data size growing after spark processing, even if I didn't change anything.

Also, I failed to find, is there any configurable compression rate for snappy, e.g. -1 ... -9? As I know, gzip has this, but what is the way to control this rate in Spark/Parquet writer?

Appreciate for any help!

Thanks!

like image 858
Mikhail Dubkov Avatar asked Feb 18 '18 01:02

Mikhail Dubkov


People also ask

How much does snappy compression?

Snappy has been optimized for 64-bit x86 processors, on a single core Intel Core i7 processor achieving a compression rate of at least 250MB/s and a decompression one of 500MB/s.

What is Parquet compression ratio?

The final test, disk space results, are quite impressive for both formats: With Parquet, the 194GB CSV file was compressed to 4.7GB; and with Avro, to 16.9GB. That reflects an amazing 97.56% compression ratio for Parquet and an equally impressive 91.24% compression ratio for Avro.

What is snappy compression Parquet?

By default Big SQL will use SNAPPY compression when writing into Parquet tables. This means that if data is loaded into Big SQL using either the LOAD HADOOP or INSERT… SELECT commands, then SNAPPY compression is enabled by default.

How does snappy compression work?

Snappy is intended to be fast. On a single core of a Core i7 processor in 64-bit mode, it compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more. (These numbers are for the slowest inputs in our benchmark suite; others are much faster.)


1 Answers

when you call repartition(n) on a dataframe you are doing a round-robin partitioning. Any data locality that existed prior to repartitioning is gone entropy has gone up. So run length and dictionary encoders as well as compression codecs don't really have much to work with.

so when you do you repartition you need to use repartition (n, col) version. give it a good column that would preserve data locality.

Also, since you are probably optimizing your sqooped tables for downstream jobs you can sortWithinPartition for faster scans.

df.repartition(100, $"userId").sortWithinPartitions("userId").write.parquet(...)

like image 184
user1509458 Avatar answered Oct 04 '22 22:10

user1509458