Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to persist sorted parquet tables for future sort merge joins?

I want to persist a large sorted table to Parquet on S3 and then read this in and join it using the Sorted Merge Join strategy against another large sorted table.

The problem is: even though I sort these tables on the join key beforehand, once I persist them to Parquet, they seem to lose the information about their sortedness. Is there anyway to hint to Spark that they do not need to be resorted the next time I read them in?

I've been trying this on Spark 1.5 and I keep getting SQL EXPLAIN plans looking like:

[== Physical Plan ==]
[TungstenProject [pos#28400,workf...#28399]]
[ SortMergeJoin [CHROM#28403,pos#28400], [CHROM#28399,pos#28332]]
[  TungstenSort [CHROM#28403 ASC,pos#28400 ASC], false, 0]
[   TungstenExchange hashpartitioning(CHROM#28403,pos#28400)]
[    ConvertToUnsafe]
[     Scan ParquetRelation[file:/....sorted.parquet][pos#284....8424]]
[  TungstenSort [CHROM#28399 ASC,pos#28332 ASC], false, 0]
[   TungstenExchange hashpartitioning(CHROM#28399,pos#28332)]
[    ConvertToUnsafe]
[     Scan ParquetRelation[file:....exploded_sorted.parquet][pos#2.....399]]

You can see the extra TungstenExchange and TungstenSort stages in there even though this join is on two tables which were orderBy sorted on the join keys before saving to Parquet.

like image 752
JKnight Avatar asked Aug 26 '15 02:08

JKnight


2 Answers

It looks like this is coming in Spark 2.0 along with support for bucketing.

like image 97
JKnight Avatar answered Nov 28 '22 12:11

JKnight


Unfortunately,Spark-2.0 doesn't support writing to S3 with bucketing yet. I tried the Spark-2.0-priview yesterday.

val NUMBER_OF_BUCKETS = 20
rdd.toDF.write.mode(SaveMode.Overwrite)
        .bucketBy(NUMBER_OF_BUCKETS,"data_frame_key")
        .partitionBy("day")
        .save("s3://XXXXX")

And got this error message:

java.lang.IllegalArgumentException: Currently we don't support writing bucketed data to this data source.
    at org.apache.spark.sql.DataFrameWriter.assertNotBucketed(DataFrameWriter.scala:462)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
like image 45
Yuan Zhao Avatar answered Nov 28 '22 12:11

Yuan Zhao