Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

using DataSet.repartition in Spark 2 - several tasks handle more than one partition

we have a spark streaming application (spark 2.1 run over Hortonworks 2.6) and use the DataSet.repartition (on a DataSet<Row> that's read from Kafka) in order to repartition the DataSet<Row>'s partitions according to a given column (called block_id).

We start with a DataSet<Row>containing 50 partitions and end up (after the call to DataSet.repartition) with number of partitions equivalent to the number of unique block_id's.

The problem is that the DataSet.repartition behaves not as we expected - when we look at the event timeline of the spark job that runs the repartition, we see there are several tasks that handle 1 block_id and fewer tasks that handle 2 block_id's or even 3 or 4 block_id's.

It seems that DataSet.repartition ensures that all the Rows with the same block_id will be inside a single partition, but not that each task that creates a partition will handle only one block_id.

The result is that the repartition job (that runs inside the streaming application) takes as much time as its longest task (which is the task that handles the most block_id's.

We tried playing with the number of Vcores given to the streaming app - from 10 to 25 to 50 (we have 50 partitions in the original RDD that's read from Kafka) but the result was the same - there's always one or more task that handles more than one block_id.

We even tried increasing the batch time, again that didn't help us to achieve the goal of one task handling one block_id.

To give an example - here's the event timeline and the tasks table describing a run of the repartitionspark job:

event timeline - the two tasks in red are the ones that handle two block_id's:

enter image description here

tasks table - the two tasks in red are the same two from above - notice the duration of each of them is twice as the duration of all other tasks (that handle only one block_id)

enter image description here

This is a problem for us because the streaming application is delayed due to these long tasks and we need a solution that will enable us to perform repartition on a DataSet while having each task handling only one block_id.

And if that's not possible then maybe that's possible on an JavaRDD? Since in our case the DataSet<Row> we run the repartition on is created from a JavaRDD.

like image 326
Elad Eldor Avatar asked Jul 31 '17 19:07

Elad Eldor


People also ask

How many tasks can Spark run on an RDD's partition?

Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster.

How do you effectively repartition a Spark data frame?

If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.

How will you control the number of partitions in Spark DataFrame across the application?

4.1 repartition() & coalesce() Methods repartition() and coalesce() helps us to repartition. Note: When you want to reduce the number of partitions, It is recommended to use PySpark coalesce() over repartition() as it uses fewer resources due to less number of shuffles it takes.

How many partitions should I use Spark?

The general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, and for upper bound — the task should take 100ms+ time to execute.


1 Answers

The 2 problems you need to consider:

  • Have a custom partitioner that assures data uniform distribution, 1 block_id / partition
  • Sizing the cluster so that you have enough executors to run all tasks (block_ids) simultaneously

As you've seen a simple repartition on the DataFrame doesn't assure you'll get an uniform distribution. When you repartition by block_id it will use the HashPartitioner, with formula:

Utils.nonNegativeMod(key.hashCode, numPartitions)

See: https://github.com/apache/spark/blob/branch-2.2/core/src/main/scala/org/apache/spark/Partitioner.scala#L80-L88

It's very possible 2+ keys are assigned to the same partition_id as the partition_id is key's hashCode modulo numPartitions.

What you need can be achieved by using the RDD with a custom partitioner. The easiest will be to extract the list of distinct block_ids before repartitioning.

Here's a simple example. Let's say you can have 5 blocks (2,3,6,8,9) and your cluster has 8 executors (can run up to 8 tasks simultaneously), we're over-provisioned by 3 executors:

scala> spark.conf.get("spark.sql.shuffle.partitions")
res0: String = 8

scala> spark.conf.get("spark.default.parallelism")
res1: String = 8

// Basic class to store dummy records
scala> case class MyRec(block_id: Int, other: String)
defined class MyRec

// Sample DS
scala> val ds = List((2,"A"), (3,"X"), (3, "B"), (9, "Y"), (6, "C"), (9, "M"), (6, "Q"), (2, "K"), (2, "O"), (6, "W"), (2, "T"), (8, "T")).toDF("block_id", "other").as[MyRec]
ds: org.apache.spark.sql.Dataset[MyRec] = [block_id: int, other: string]

scala> ds.show
+--------+-----+
|block_id|other|
+--------+-----+
|       2|    A|
|       3|    X|
|       3|    B|
|       9|    Y|
|       6|    C|
|       9|    M|
|       6|    Q|
|       2|    K|
|       2|    O|
|       6|    W|
|       2|    T|
|       8|    T|
+--------+-----+

// Default partitioning gets data distributed as uniformly as possible (record count)
scala> ds.rdd.getNumPartitions
res3: Int = 8

// Print records distribution by partition
scala> ds.rdd.mapPartitionsWithIndex((idx, it) => Iterator((idx, it.toList))).toDF("partition_id", "block_ids").show
+------------+--------------+
|partition_id|     block_ids|
+------------+--------------+
|           0|       [[2,A]]|
|           1|[[3,X], [3,B]]|
|           2|       [[9,Y]]|
|           3|[[6,C], [9,M]]|
|           4|       [[6,Q]]|
|           5|[[2,K], [2,O]]|
|           6|       [[6,W]]|
|           7|[[2,T], [8,T]]|
+------------+--------------+

// repartitioning by block_id leaves 4 partitions empty and assigns 2 block_ids (6,9) to same partition (1)
scala> ds.repartition('block_id).rdd.mapPartitionsWithIndex((idx, it) => Iterator((idx, it.toList))).toDF("partition_id", "block_ids").where(size('block_ids) > 0).show(false)
+------------+-----------------------------------+
|partition_id|block_ids                          |
+------------+-----------------------------------+
|1           |[[9,Y], [6,C], [9,M], [6,Q], [6,W]]|
|3           |[[3,X], [3,B]]                     |
|6           |[[2,A], [2,K], [2,O], [2,T]]       |
|7           |[[8,T]]                            |
+------------+-----------------------------------+

// Create a simple mapping for block_id to partition_id to be used by our custom partitioner (logic may be more elaborate or static if the list of block_ids is static):
scala> val mappings = ds.map(_.block_id).dropDuplicates.collect.zipWithIndex.toMap
mappings: scala.collection.immutable.Map[Int,Int] = Map(6 -> 1, 9 -> 0, 2 -> 3, 3 -> 2, 8 -> 4)

//custom partitioner assigns partition_id according to the mapping arg
scala> class CustomPartitioner(mappings: Map[Int,Int]) extends org.apache.spark.Partitioner {
     |   override def numPartitions: Int = mappings.size
     |   override def getPartition(rec: Any): Int = { mappings.getOrElse(rec.asInstanceOf[Int], 0) }
     | }
defined class CustomPartitioner

// Repartition DS using new partitioner
scala> val newDS = ds.rdd.map(r => (r.block_id, r)).partitionBy(new CustomPartitioner(mappings)).toDS
newDS: org.apache.spark.sql.Dataset[(Int, MyRec)] = [_1: int, _2: struct<block_id: int, other: string>]

// Display evenly distributed block_ids
scala> newDS.rdd.mapPartitionsWithIndex((idx, it) => Iterator((idx, it.toList))).toDF("partition_id", "block_ids").where(size('block_ids) > 0).show(false)
+------------+--------------------------------------------+
|partition_id|block_ids                                   |
+------------+--------------------------------------------+
|0           |[[9,[9,Y]], [9,[9,M]]]                      |
|1           |[[6,[6,C]], [6,[6,Q]], [6,[6,W]]]           |
|2           |[[3,[3,X]], [3,[3,B]]]                      |
|3           |[[2,[2,A]], [2,[2,K]], [2,[2,O]], [2,[2,T]]]|
|4           |[[8,[8,T]]]                                 |
+------------+--------------------------------------------+
like image 131
Traian Avatar answered Oct 22 '22 05:10

Traian