Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why are Spark Parquet files for an aggregate larger than the original?

I am trying to create an aggregate file for end users to utilize to avoid having them process multiple sources with much larger files. To do that I: A) iterate through all source folders, stripping out 12 fields that are most commonly requested, spinning out parquet files in a new location where these results are co-located. B) I try to go back through the files created in step A and re-aggregate them by grouping by the 12 fields to reduce it to a summary row for each unique combination.

What I'm finding is that step A reduces the payload 5:1 (roughly 250 gigs becomes 48.5 gigs). Step B however, instead of further reducing this, increase by 50% over step A. However, my counts match.

This is using Spark 1.5.2
My code, modified only to replace the field names with field1...field12 to make it more readable, is below with the results I've noted.

While I don't necessarily expect another 5:1 reduction, I don't know what I'm doing incorrectly to increase the storage side for less rows with the same schema. Anyone able to help me understand what I did wrong?

Thanks!

//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients 
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of  16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed 

//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in  3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed

//The parquet schemas created (both tables match):
 |-- field1: string (nullable = true) (10 characters)
 |-- field2: string (nullable = true) (15 characters)
 |-- field3: string (nullable = true) (50 characters max)
 |-- field4: string (nullable = true) (10 characters)
 |-- field5: string (nullable = true) (10 characters)
 |-- field6: string (nullable = true) (10 characters)
 |-- field7: string (nullable = true) (16 characters)
 |-- field8: string (nullable = true) (10 characters)
 |-- field9 string (nullable = true)  (15 characters)
 |-- field10: string (nullable = true)(20 characters)
 |-- field11: string (nullable = true)(14 characters)
 |-- field12: string (nullable = true)(14 characters)
 |-- rCount: long (nullable = true)   
 |-- dt: string (nullable = true)
like image 340
Steve Drew Avatar asked Jul 01 '16 21:07

Steve Drew


People also ask

How large should a Parquet file be?

The official Parquet documentation recommends a disk block/row group/file size of 512 to 1024 MB on HDFS.

Why Parquet is best fit for spark?

Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.


1 Answers

In general columnar storage formats like Parquet are highly sensitive when it comes to data distribution (data organization) and cardinality of individual columns. The more organized is data and the lower is cardinality the more efficient is the storage.

Aggregation, as the one you apply, has to shuffle the data. When you check the execution plan you'll see it is using hash partitioner. It means that after aggregation distribution can be less efficient than the one for the original data. At the same time sum can reduce number of rows but increase cardinality for rCount column.

You can try different tools to correct for that but not all are available in Spark 1.5.2:

  • Sort complete dataset by columns having low cardinality (quite expensive due to full shuffle) or sortWithinPartitions.
  • Use partitionBy method of DataFrameWriter to partition data using low cardinality columns.
  • Use bucketBy and sortBy methods of DataFrameWriter (Spark 2.0.0+) to improve data distribution using bucketing and local sorting.
like image 81
zero323 Avatar answered Sep 30 '22 17:09

zero323