I want to know if Spark knows the partitioning key of the parquet file and uses this information to avoid shuffles.
Context:
Running Spark 2.0.1 running local SparkSession. I have a csv dataset that I am saving as parquet file on my disk like so:
val df0 = spark
.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("inferSchema", false)
.load("SomeFile.csv"))
val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)
df.write
.mode(SaveMode.Overwrite)
.format("parquet")
.option("inferSchema", false)
.save("SomeFile.parquet")
I am creating 42 partitions by column numerocarte
. This should group multiple numerocarte
to same partition. I don't want to do partitionBy("numerocarte") at the write
time because I don't want one partition per card. It would be millions of them.
After that in another script I read this SomeFile.parquet
parquet file and do some operations on it. In particular I am running a window function
on it where the partitioning is done on the same column that the parquet file was repartitioned by.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df2 = spark.read
.format("parquet")
.option("header", true)
.option("inferSchema", false)
.load("SomeFile.parquet")
val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))
df2.withColumn("NewColumnName",
sum(col("dollars").over(w))
After read
I can see that the repartition
worked as expected and DataFrame df2
has 42 partitions and in each of them are different cards.
Questions:
df2
is partitioned by column numerocarte
?DataFrame
? Is there a command for this? I know how to check number of partitions but how to see partitioning key? read
and 200 partitions after withColumn
which suggests that Spark repartitioned my DataFrame
.Joining two tables is one of the main transactions in Spark. It mostly requires shuffle which has a high cost due to data movement between nodes. If one of the tables is small enough, any shuffle operation may not be required. By broadcasting the small table to each node in the cluster, shuffle can be simply avoided.
Duplicate rows could be remove or drop from Spark SQL DataFrame using distinct() and dropDuplicates() functions, distinct() can be used to remove rows that have the same values on all columns whereas dropDuplicates() can be used to remove rows that have the same values on multiple selected columns.
The coalesce reduces the number of partitions in a DataFrame. Coalesce avoids complete shuffle; instead of creating new partitions, it shuffles the data using Hash Partitioner (Default) and adjusts into existing partitions.
So as you can see distinct creates a shuffle. It is also particularly important to find out this way rather than docs because there are situations where a shuffle will be required or not required for a certain function.
Does Spark know that the dataframe df2 is partitioned by column numerocarte?
It does not.
If it does not know, how do I tell Spark the data is already partitioned by the right column?
You don't. Just because you save data which has been shuffled, it does not mean, that it will be loaded with the same splits.
How can I check a partitioning key of DataFrame?
There is no partitioning key once you loaded data, but you can check queryExecution
for Partitioner
.
In practice:
partitionBy
method of DataFrameWriter
.bucketBy
with metastore and persistent tables.See How to define partitioning of DataFrame? for detailed examples.
I am answering my own question for future reference what worked.
Following suggestion of @user8371915, bucketBy works!
I am saving my DataFrame df
:
df.write
.bucketBy(250, "userid")
.saveAsTable("myNewTable")
Then when I need to load this table:
val df2 = spark.sql("SELECT * FROM myNewTable")
val w = Window.partitionBy("userid")
val df3 = df2.withColumn("newColumnName", sum(col("someColumn")).over(w)
df3.explain
I confirm that when I do window functions on df2
partitioned by userid
there is no shuffle! Thanks @user8371915!
Some things I learned while investigating it
spark.read.format("parquet").load("path/to/myNewTable")
but the DataFrame
created this way will not keep the original partitioning! You must use spark.sql
select
to get correctly partitioned DataFrame
.spark.sql("describe formatted myNewTable").collect.foreach(println)
. This will tell you what columns were used for bucketing and how many buckets there are..sortBy()
and the sort will be also preserved in the hive table. df.write.bucketBy(250, "userid").sortBy("somColumnName").saveAsTable("myNewTable")
myNewTable
is saved to a spark-warehouse
folder in my local Scala SBT project. When saving in cluster mode with mesos via spark-submit
, it is saved to hive warehouse. For me it was located in /user/hive/warehouse
.spark-submit
you need to add to your SparkSession
two options: .config("hive.metastore.uris", "thrift://addres-to-your-master:9083")
and .enableHiveSupport()
. Otherwise the hive tables you created will not be visible.spark.sql("USE your database")
before bucketing.Update 05-02-2018
I encountered some problems with spark bucketing and creation of Hive tables. Please refer to question, replies and comments in Why is Spark saveAsTable with bucketBy creating thousands of files?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With