we have a spark streaming application (spark 2.1 run over Hortonworks 2.6) and use the DataSet.repartition
(on a DataSet<Row>
that's read from Kafka) in order to repartition the DataSet<Row>'s
partitions according to a given column (called block_id).
We start with a DataSet<Row>
containing 50 partitions and end up (after the call to DataSet.repartition
) with number of partitions equivalent to the number of unique block_id's.
The problem is that the DataSet.repartition
behaves not as we expected - when we look at the event timeline of the spark job that runs the repartition
, we see there are several tasks that handle 1 block_id and fewer tasks that handle 2 block_id's or even 3 or 4 block_id's.
It seems that DataSet.repartition
ensures that all the Rows
with the same block_id will be inside a single partition, but not that each task that creates a partition will handle only one block_id.
The result is that the repartition job (that runs inside the streaming application) takes as much time as its longest task (which is the task that handles the most block_id's.
We tried playing with the number of Vcores given to the streaming app - from 10 to 25 to 50 (we have 50 partitions in the original RDD that's read from Kafka) but the result was the same - there's always one or more task that handles more than one block_id.
We even tried increasing the batch time, again that didn't help us to achieve the goal of one task handling one block_id.
To give an example - here's the event timeline and the tasks table describing a run of the repartition
spark job:
event timeline - the two tasks in red are the ones that handle two block_id's:
tasks table - the two tasks in red are the same two from above - notice the duration of each of them is twice as the duration of all other tasks (that handle only one block_id)
This is a problem for us because the streaming application is delayed due to these long tasks and we need a solution that will enable us to perform repartition
on a DataSet while having each task handling only one block_id.
And if that's not possible then maybe that's possible on an JavaRDD?
Since in our case the DataSet<Row>
we run the repartition
on is created from a JavaRDD
.
Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster.
If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.
4.1 repartition() & coalesce() Methods repartition() and coalesce() helps us to repartition. Note: When you want to reduce the number of partitions, It is recommended to use PySpark coalesce() over repartition() as it uses fewer resources due to less number of shuffles it takes.
The general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, and for upper bound — the task should take 100ms+ time to execute.
The 2 problems you need to consider:
As you've seen a simple repartition on the DataFrame doesn't assure you'll get an uniform distribution. When you repartition by block_id it will use the HashPartitioner, with formula:
Utils.nonNegativeMod(key.hashCode, numPartitions)
See: https://github.com/apache/spark/blob/branch-2.2/core/src/main/scala/org/apache/spark/Partitioner.scala#L80-L88
It's very possible 2+ keys are assigned to the same partition_id as the partition_id is key's hashCode modulo numPartitions.
What you need can be achieved by using the RDD with a custom partitioner. The easiest will be to extract the list of distinct block_ids before repartitioning.
Here's a simple example. Let's say you can have 5 blocks (2,3,6,8,9) and your cluster has 8 executors (can run up to 8 tasks simultaneously), we're over-provisioned by 3 executors:
scala> spark.conf.get("spark.sql.shuffle.partitions")
res0: String = 8
scala> spark.conf.get("spark.default.parallelism")
res1: String = 8
// Basic class to store dummy records
scala> case class MyRec(block_id: Int, other: String)
defined class MyRec
// Sample DS
scala> val ds = List((2,"A"), (3,"X"), (3, "B"), (9, "Y"), (6, "C"), (9, "M"), (6, "Q"), (2, "K"), (2, "O"), (6, "W"), (2, "T"), (8, "T")).toDF("block_id", "other").as[MyRec]
ds: org.apache.spark.sql.Dataset[MyRec] = [block_id: int, other: string]
scala> ds.show
+--------+-----+
|block_id|other|
+--------+-----+
| 2| A|
| 3| X|
| 3| B|
| 9| Y|
| 6| C|
| 9| M|
| 6| Q|
| 2| K|
| 2| O|
| 6| W|
| 2| T|
| 8| T|
+--------+-----+
// Default partitioning gets data distributed as uniformly as possible (record count)
scala> ds.rdd.getNumPartitions
res3: Int = 8
// Print records distribution by partition
scala> ds.rdd.mapPartitionsWithIndex((idx, it) => Iterator((idx, it.toList))).toDF("partition_id", "block_ids").show
+------------+--------------+
|partition_id| block_ids|
+------------+--------------+
| 0| [[2,A]]|
| 1|[[3,X], [3,B]]|
| 2| [[9,Y]]|
| 3|[[6,C], [9,M]]|
| 4| [[6,Q]]|
| 5|[[2,K], [2,O]]|
| 6| [[6,W]]|
| 7|[[2,T], [8,T]]|
+------------+--------------+
// repartitioning by block_id leaves 4 partitions empty and assigns 2 block_ids (6,9) to same partition (1)
scala> ds.repartition('block_id).rdd.mapPartitionsWithIndex((idx, it) => Iterator((idx, it.toList))).toDF("partition_id", "block_ids").where(size('block_ids) > 0).show(false)
+------------+-----------------------------------+
|partition_id|block_ids |
+------------+-----------------------------------+
|1 |[[9,Y], [6,C], [9,M], [6,Q], [6,W]]|
|3 |[[3,X], [3,B]] |
|6 |[[2,A], [2,K], [2,O], [2,T]] |
|7 |[[8,T]] |
+------------+-----------------------------------+
// Create a simple mapping for block_id to partition_id to be used by our custom partitioner (logic may be more elaborate or static if the list of block_ids is static):
scala> val mappings = ds.map(_.block_id).dropDuplicates.collect.zipWithIndex.toMap
mappings: scala.collection.immutable.Map[Int,Int] = Map(6 -> 1, 9 -> 0, 2 -> 3, 3 -> 2, 8 -> 4)
//custom partitioner assigns partition_id according to the mapping arg
scala> class CustomPartitioner(mappings: Map[Int,Int]) extends org.apache.spark.Partitioner {
| override def numPartitions: Int = mappings.size
| override def getPartition(rec: Any): Int = { mappings.getOrElse(rec.asInstanceOf[Int], 0) }
| }
defined class CustomPartitioner
// Repartition DS using new partitioner
scala> val newDS = ds.rdd.map(r => (r.block_id, r)).partitionBy(new CustomPartitioner(mappings)).toDS
newDS: org.apache.spark.sql.Dataset[(Int, MyRec)] = [_1: int, _2: struct<block_id: int, other: string>]
// Display evenly distributed block_ids
scala> newDS.rdd.mapPartitionsWithIndex((idx, it) => Iterator((idx, it.toList))).toDF("partition_id", "block_ids").where(size('block_ids) > 0).show(false)
+------------+--------------------------------------------+
|partition_id|block_ids |
+------------+--------------------------------------------+
|0 |[[9,[9,Y]], [9,[9,M]]] |
|1 |[[6,[6,C]], [6,[6,Q]], [6,[6,W]]] |
|2 |[[3,[3,X]], [3,[3,B]]] |
|3 |[[2,[2,A]], [2,[2,K]], [2,[2,O]], [2,[2,T]]]|
|4 |[[8,[8,T]]] |
+------------+--------------------------------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With