Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Spark SQL alternatives to groupby/pivot/agg/collect_list using foldLeft & withColumn so as to improve performance

I have a Spark DataFrame consisting of three columns:

 id | col1 | col2 
 x  |  p1  |  a1  
 x  |  p2  |  b1
 y  |  p2  |  b2
 y  |  p2  |  b3
 y  |  p3  |  c1

After applying df.groupBy("id").pivot("col1").agg(collect_list("col2")) I am getting the following dataframe (aggDF):

| id|  p1|      p2|  p3|
|  x|[a1]|    [b1]|  []|
|  y|  []|[b2, b3]|[c1]|

Then I find the name of columns except the id column.

val cols = aggDF.columns.filter(x => x != "id")

After that I am using cols.foldLeft(aggDF)((df, x) => df.withColumn(x, when(size(col(x)) > 0, col(x)).otherwise(lit(null)))) to replace empty array with null. The performance of this code becomes poor when the number of columns increases. Additionally, I have the name of string columns val stringColumns = Array("p1","p3"). I want to get the following final dataframe:

| id|  p1|      p2|  p3|
|  x| a1 |    [b1]|null|
|  y|null|[b2, b3]| c1 |

Is there any better solution to this problem in order to achieve the final dataframe?

like image 666
Abir Chokraborty Avatar asked Nov 07 '22 11:11

Abir Chokraborty

1 Answers

You current code pays 2 performance costs as structured:

  • As mentioned by Alexandros, you pay 1 catalyst analysis per DataFrame transform so if you loop other a few hundreds or thousands columns, you'll notice some time spent on the driver before the job is actually submitted. If this is a critical issue for you, you can use a single select statement instead of your foldLeft on withColumns but this won't really change a lot the execution time because of the next point

  • When you use an expression such as when().otherwise() on columns in what can be optimized as a single select statement, the code generator will produce a single large method processing all the columns. If you have more than a couple hundred columns, it's likely that the resulting method won't be JIT-compiled by default by the JVM, resulting in very slow execution performance (max JIT-able method is 8k bytecode in Hotspot).

You can detect if you hit the second issue by inspecting the executor logs and check if you see a WARNING on a too large method that can't be JITed.

How to try and solve this ?

1 - Changing the logic

You can filter the empty cells before the pivot by using a window transform

import org.apache.spark.sql.expressions.Window

val finalDf = df
  .withColumn("count", count('col2) over Window.partitionBy('id,'col1)) 
  .filter('count > 0)

This may or may not be faster depending on actual dataset as the pivot also generates a large select statement expression by itself so it may hit the large method threshold if you encounter more than approximately 500 values for col1. You may want to combine this with option 2 as well.

2 - Try and finesse the JVM

You can add an extraJavaOption on your executors to ask the JVM to try and JIT hot methods larger than 8k.

For example, add the option --conf "spark.executor.extraJavaOptions=-XX:-DontCompileHugeMethods" on your spark-submit and see how it impacts the pivot execution time.

It's difficult to guarantee a substantial speed increase without more details on your real dataset but it's definitely worth a shot.

like image 180
rluta Avatar answered Nov 15 '22 11:11
