Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Most efficient way to sort and partition data to be written as parquet

My data is in principle a table, which contains a column ID and a column GROUP_ID, besides other 'data'.

In the first step I am reading CSV's into Spark, do some processing to prepare the data for the second step, and write the data as parquet. The second step does a lot of groupBy('GROUP_ID') and Window.partitionBy('GROUP_ID').orderBy('ID').

The goal now is -- in order to avoid shuffling in the second step -- to efficiently load the data in the first step, as this is a one-timer.

Question Part 1: AFAIK, Spark preserves the partitioning when loading from parquet (which is actually the basis of any "optimized write consideration" to be made) - correct?

I came up with three possibilities:

  • df.orderBy('ID').write.partitionBy('TRIP_ID').parquet('/path/to/parquet')
  • df.orderBy('ID').repartition(n, 'TRIP_ID').write.parquet('/path/to/parquet')
  • df.repartition(n, 'TRIP_ID').sortWithinPartitions('ID').write.parquet('/path/to/parquet')

I would set n such that the individual parquet files would be ~100MB.

Question Part 2: Is it correct that the three options produce "the same"/similar results in regard of the goal (avoid shuffling in the 2nd step)? If not, what is the difference? And which one is 'better'?

Question Part 3: Which of the three options performs better regarding step 1?

Thanks for sharing your knowledge!


EDIT 2017-07-24

After doing some tests (writing to and reading from parquet) it seems that Spark is not able to recover partitionBy and orderBy information by default in the second step. The number of partitions (as obtained from df.rdd.getNumPartitions() seems to be determined by the number of cores and/or by spark.default.parallelism (if set), but not by the number of parquet partitions. So answer for question 1 would be WRONG, and questions 2 and 3 would be irrelevant.

So it turns out the REAL QUESTION is: is there a way to tell Spark, that the data is already partitioned by column X and sorted by column Y?

like image 853
akoeltringer Avatar asked Jul 20 '17 20:07

akoeltringer


People also ask

Which method is used to reduce the number of partitions post processing in Spark?

Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.

Why is Parquet better for Spark?

Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.


2 Answers

You probably will be interested in bucketing support in Spark.

See details here https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-bucketing.html

large.write
  .bucketBy(4, "id")
  .sortBy("id")
  .mode(SaveMode.Overwrite)
  .saveAsTable(bucketedTableName)

Notice Spark 2.4 added support for bucket pruning (like partition pruning)

More direct functionality you're looking at is Hive' bucketed-sorted tables https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-BucketedSortedTables This is not yet available in Spark (see PS section below)

Also notice that the sorting information will not be loaded by Spark automatically, but since the data is already sorted.. the sorting operation on it will actually be much faster as not much work to do - e.g. one pass on data just to confirm that it is already sorted.

PS. Spark and Hive bucketing are slightly different. This is umbrella ticket to provide a compatibility in Spark for bucketed tables created in Hive - https://issues.apache.org/jira/browse/SPARK-19256

like image 54
Tagar Avatar answered Nov 03 '22 00:11

Tagar


As far as I know, NO there is no way to read data from parquet and tell Spark that it is already partitioned by some expression and ordered.

In short, one file on HDFS etc. is too big for one Spark partition. And even if you read whole file to one partition playing with Parquet properties such as parquet.split.files=false, parquet.task.side.metadata=true etc. there are would be most costs compare to just one shuffle.

like image 37
Mikhail Dubkov Avatar answered Nov 03 '22 01:11

Mikhail Dubkov