Trying to understand how Hive partitions relate to Spark partitions, culminating in a question about joins.
I have 2 external Hive tables; both backed by S3 buckets and partitioned by date
; so in each bucket there are keys with name format date=<yyyy-MM-dd>/<filename>
.
Question 1:
If I read this data into Spark:
val table1 = spark.table("table1").as[Table1Row]
val table2 = spark.table("table2").as[Table2Row]
then how many partitions are the resultant datasets going to have respectively? Partitions equal to the number of objects in S3?
Question 2:
Suppose the two row types have the following schema:
Table1Row(date: Date, id: String, ...)
Table2Row(date: Date, id: String, ...)
and that I want to join table1
and table2
on the fields date
and id
:
table1.joinWith(table2,
table1("date") === table2("date") &&
table1("id") === table2("id")
)
Is Spark going to be able to utilize the fact that one of the fields being joined on is the partition key in the Hive tables to optimize the join? And if so how?
Question 3:
Suppose now that I am using RDD
s instead:
val rdd1 = table1.rdd
val rdd2 = table2.rdd
AFAIK, the syntax for the join using the RDD
API would look something like:
rdd1.map(row1 => ((row1.date, row1.id), row1))
.join(rdd2.map(row2 => ((row2.date, row2.id), row2))))
Again, is Spark going to be able to utilize the fact that the partition key in the Hive tables is being used in the join?
Note right away that spark partitions ≠ hive partitions. They are both chunks of data, but Spark splits data in order to process it in parallel in memory. Hive partition is in the storage, in the disk, in persistence.
Spark automatically partitions RDDs and distributes the partitions across different nodes. A partition in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster. Partitions are basic units of parallelism in Apache Spark. RDDs in Apache Spark are collection of partitions.
Apache Spark supports two types of partitioning “hash partitioning” and “range partitioning”.
By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine. Data of each partition resides in a single machine. Spark/PySpark creates a task for each partition. Spark Shuffle operations move the data from one partition to other partitions.
then how many partitions are the resultant datasets going to have respectively? Partitions equal to the number of objects in S3?
Impossible to answer given information you've provided. Number of partitions in latest versions depends on primarily on spark.sql.files.maxPartitionByte
, although other factors can play some role as well.
Is Spark going to be able to utilize the fact that one of the fields being joined on is the partition key in the Hive tables to optimize the join?
Not as of today (Spark 2.3.0), however Spark can utilize bucketing (DISTRIBUTE BY
) to optimize joins. See How to define partitioning of DataFrame?. This might change in the future, once Data Source API v2 stabilizes.
Suppose now that I am using RDDs instead (...) Again, is Spark going to be able to utilise the fact that the partition key in the Hive tables is being used in the join?
Not at all. Even if data is bucketed RDD transformations and functional Dataset
transformations are black boxes. No optimization can be applied and are applied here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With