I want to persist a large sorted table to Parquet on S3 and then read this in and join it using the Sorted Merge Join strategy against another large sorted table.
The problem is: even though I sort these tables on the join key beforehand, once I persist them to Parquet, they seem to lose the information about their sortedness. Is there anyway to hint to Spark that they do not need to be resorted the next time I read them in?
I've been trying this on Spark 1.5 and I keep getting SQL EXPLAIN
plans looking like:
[== Physical Plan ==]
[TungstenProject [pos#28400,workf...#28399]]
[ SortMergeJoin [CHROM#28403,pos#28400], [CHROM#28399,pos#28332]]
[ TungstenSort [CHROM#28403 ASC,pos#28400 ASC], false, 0]
[ TungstenExchange hashpartitioning(CHROM#28403,pos#28400)]
[ ConvertToUnsafe]
[ Scan ParquetRelation[file:/....sorted.parquet][pos#284....8424]]
[ TungstenSort [CHROM#28399 ASC,pos#28332 ASC], false, 0]
[ TungstenExchange hashpartitioning(CHROM#28399,pos#28332)]
[ ConvertToUnsafe]
[ Scan ParquetRelation[file:....exploded_sorted.parquet][pos#2.....399]]
You can see the extra TungstenExchange and TungstenSort stages in there even though this join is on two tables which were orderBy
sorted on the join keys before saving to Parquet.
It looks like this is coming in Spark 2.0 along with support for bucketing.
Unfortunately,Spark-2.0 doesn't support writing to S3 with bucketing yet. I tried the Spark-2.0-priview yesterday.
val NUMBER_OF_BUCKETS = 20
rdd.toDF.write.mode(SaveMode.Overwrite)
.bucketBy(NUMBER_OF_BUCKETS,"data_frame_key")
.partitionBy("day")
.save("s3://XXXXX")
And got this error message:
java.lang.IllegalArgumentException: Currently we don't support writing bucketed data to this data source.
at org.apache.spark.sql.DataFrameWriter.assertNotBucketed(DataFrameWriter.scala:462)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With