Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Split Spark DataFrame based on condition

I need something similar to the randomSplit function:

val Array(df1, df2) = myDataFrame.randomSplit(Array(0.6, 0.4))

However, I need to split myDataFrame based on a boolean condition. Does anything like the following exist?

val Array(df1, df2) = myDataFrame.booleanSplit(col("myColumn") > 100)

I'd like not to do two separate .filter calls.

like image 476
Marsellus Wallace Avatar asked Jan 31 '17 14:01

Marsellus Wallace


2 Answers

Unfortunately the DataFrame API doesn't have such a method, to split by a condition you'll have to perform two separate filter transformations:

myDataFrame.cache() // recommended to prevent repeating the calculation

val condition = col("myColumn") > 100
val df1 = myDataFrame.filter(condition)
val df2 = myDataFrame.filter(not(condition))
like image 145
Tzach Zohar Avatar answered Oct 21 '22 06:10

Tzach Zohar


I understand that caching and filtering twice looks a bit ugly, but please bear in mind that DataFrames are translated to RDDs, which are evaluated lazily, i.e. only when they are directly or indirectly used in an action.

If a method booleanSplit as suggested in the question existed, the result would be translated to two RDDs, each of which would be evaluated lazily. One of the two RDDs would be evaluated first and the other would be evaluated second, strictly after the first. At the point the first RDD is evaluated, the second RDD would not yet have "come into existence" (EDIT: Just noticed that there is a similar question for the RDD API with an answer that gives a similar reasoning)

To actually gain any performance benefit, the second RDD would have to be (partially) persisted during the iteration of the first RDD (or, actually, during the iteration of the parent RDD of both, which is triggered by the iteration of the first RDD). IMO this wouldn't align overly well with the design of the rest of the RDD API. Not sure if the performance gains would justify this.

I think the best you can achieve is to avoid writing two filter calls directly in your business code, by writing an implicit class with a method booleanSplit as a utility method does that part in a similar way as Tzach Zohar's answer, maybe using something along the lines of myDataFrame.withColumn("__condition_value", condition).cache() so the the value of the condition is not calculated twice.

like image 24
Bernhard Stadler Avatar answered Oct 21 '22 06:10

Bernhard Stadler