I have a dataframe of the following form:
import scala.util.Random
val localData = (1 to 100).map(i => (i,Seq.fill(Math.abs(Random.nextGaussian()*100).toInt)(Random.nextDouble)))
val df = sc.parallelize(localData).toDF("id","data")
|-- id: integer (nullable = false)
|-- data: array (nullable = true)
| |-- element: double (containsNull = false)
df.withColumn("data_size",size($"data")).show
+---+--------------------+---------+
| id| data|data_size|
+---+--------------------+---------+
| 1|[0.77845301260182...| 217|
| 2|[0.28806915178410...| 202|
| 3|[0.76304121847720...| 165|
| 4|[0.57955190088558...| 9|
| 5|[0.82134215959459...| 11|
| 6|[0.42193739241567...| 57|
| 7|[0.76381645621403...| 4|
| 8|[0.56507523859466...| 93|
| 9|[0.83541853717244...| 107|
| 10|[0.77955626749231...| 111|
| 11|[0.83721643562080...| 223|
| 12|[0.30546029947285...| 116|
| 13|[0.02705462199952...| 46|
| 14|[0.46646815407673...| 41|
| 15|[0.66312488908446...| 16|
| 16|[0.72644646115640...| 166|
| 17|[0.32210572380128...| 197|
| 18|[0.66680355567329...| 61|
| 19|[0.87055594653295...| 55|
| 20|[0.96600507545438...| 89|
+---+--------------------+---------+
Now I want to apply an expensive UDF, the time for the computation is ~ proportional to the size of the data array. I wonder how I can repartition my data such that each partition has approximatively the same number of "records*data_size" (i.e., data points NOT just records).
If just do df.repartition(100)
, I may get some partitions containing some very large arrays which are then the bottleneck of the entire spark stage (all other taks being already finished). If course I could just chose an insane amount of partitions which will (almost) ensure that each record is in a separate partition. But is there another way?
Make a tuple of (item, index) where the index is over the entire RDD. Swap the key and the value, so now the RDD contains (index, item) Repartition to N partitions using an identity partitionFunc , moving item to partition index % N.
If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.
Spark partitions number Best way to decide a number of spark partitions in an RDD is to make the number of partitions equal to the number of cores over the cluster. This results in all the partitions will process in parallel. Also, use of resources will do in an optimal way.
The general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, and for upper bound — the task should take 100ms+ time to execute.
As you said, you can increase the amount of partitions. I usually use a multiple of the number of cores: spark context default parallelism * 2-3..
In your case, you could use a bigger multiplier.
Another solution would be to filter split your df in this way:
You could then repartition each of them, perform computation and union them back.
Beware that repartitionning may be expensive since you have large rows to shuffle around.
You could have a look at theses slides (27+): https://www.slideshare.net/SparkSummit/custom-applications-with-sparks-rdd-spark-summit-east-talk-by-tejas-patil
They were experiencing very bad data skew and had to handle it in an interesting way.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With