Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to (equally) partition array-data in spark dataframe

I have a dataframe of the following form:

import scala.util.Random
val localData = (1 to 100).map(i => (i,Seq.fill(Math.abs(Random.nextGaussian()*100).toInt)(Random.nextDouble)))
val df = sc.parallelize(localData).toDF("id","data")

|-- id: integer (nullable = false)
|-- data: array (nullable = true)
|    |-- element: double (containsNull = false)


df.withColumn("data_size",size($"data")).show

+---+--------------------+---------+
| id|                data|data_size|
+---+--------------------+---------+
|  1|[0.77845301260182...|      217|
|  2|[0.28806915178410...|      202|
|  3|[0.76304121847720...|      165|
|  4|[0.57955190088558...|        9|
|  5|[0.82134215959459...|       11|
|  6|[0.42193739241567...|       57|
|  7|[0.76381645621403...|        4|
|  8|[0.56507523859466...|       93|
|  9|[0.83541853717244...|      107|
| 10|[0.77955626749231...|      111|
| 11|[0.83721643562080...|      223|
| 12|[0.30546029947285...|      116|
| 13|[0.02705462199952...|       46|
| 14|[0.46646815407673...|       41|
| 15|[0.66312488908446...|       16|
| 16|[0.72644646115640...|      166|
| 17|[0.32210572380128...|      197|
| 18|[0.66680355567329...|       61|
| 19|[0.87055594653295...|       55|
| 20|[0.96600507545438...|       89|
+---+--------------------+---------+

Now I want to apply an expensive UDF, the time for the computation is ~ proportional to the size of the data array. I wonder how I can repartition my data such that each partition has approximatively the same number of "records*data_size" (i.e., data points NOT just records).

If just do df.repartition(100), I may get some partitions containing some very large arrays which are then the bottleneck of the entire spark stage (all other taks being already finished). If course I could just chose an insane amount of partitions which will (almost) ensure that each record is in a separate partition. But is there another way?

like image 389
Raphael Roth Avatar asked Sep 15 '17 13:09

Raphael Roth


People also ask

How do you evenly distribute data in Spark?

Make a tuple of (item, index) where the index is over the entire RDD. Swap the key and the value, so now the RDD contains (index, item) Repartition to N partitions using an identity partitionFunc , moving item to partition index % N.

How do I change the number of partitions in a Spark data frame?

If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.

How are partitions determined in Spark?

Spark partitions number Best way to decide a number of spark partitions in an RDD is to make the number of partitions equal to the number of cores over the cluster. This results in all the partitions will process in parallel. Also, use of resources will do in an optimal way.

What is a good number of partitions in Spark?

The general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, and for upper bound — the task should take 100ms+ time to execute.


1 Answers

As you said, you can increase the amount of partitions. I usually use a multiple of the number of cores: spark context default parallelism * 2-3..
In your case, you could use a bigger multiplier.

Another solution would be to filter split your df in this way:

  • df with only bigger arrays
  • df with the rest

You could then repartition each of them, perform computation and union them back.

Beware that repartitionning may be expensive since you have large rows to shuffle around.

You could have a look at theses slides (27+): https://www.slideshare.net/SparkSummit/custom-applications-with-sparks-rdd-spark-summit-east-talk-by-tejas-patil

They were experiencing very bad data skew and had to handle it in an interesting way.

like image 67
Michel Lemay Avatar answered Oct 13 '22 15:10

Michel Lemay