Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark - pass a value from another column as the parameter of spark function

I have a spark dataframe which looks like this where expr is SQL/Hive filter expression.

+-----------------------------------------+
|expr                     |var1     |var2 |
+-------------------------+---------+-----+
|var1 > 7                 |9        |0    |
|var1 > 7                 |9        |0    |
|var1 > 7                 |9        |0    |
|var1 > 7                 |9        |0    |
|var1 = 3 AND var2 >= 0   |9        |0    |
|var1 = 3 AND var2 >= 0   |9        |0    |
|var1 = 3 AND var2 >= 0   |9        |0    |
|var1 = 3 AND var2 >= 0   |9        |0    |
|var1 = 2 AND var2 >= 0   |9        |0    |
+-------------------------+---------+-----+

I want to transform this dataframe to the dataframe below where flag is the boolean value found after evaluating the expression in column 'expr'

+---------------------------------------------------+
|expr                     |var1     |var2 |flag     |
+-------------------------+---------+-----+---------+
|var1 > 7                 |9        |0    |  True   |
|var1 > 7                 |9        |0    |  True   |
|var1 > 7                 |9        |0    |  True   |
|var1 > 7                 |9        |0    |  True   |
|var1 = 3 AND var2 >= 0   |9        |0    |     .   |
|var1 = 3 AND var2 >= 0   |9        |0    |     .   |
|var1 = 3 AND var2 >= 0   |9        |0    |     .   |
|var1 = 3 AND var2 >= 0   |9        |0    |     .   |
|var1 = 2 AND var2 >= 0   |9        |0    |     .   |
+-------------------------+---------+-----+---------+

I have tried using expr function like this:

df.withColumn('flag', expr(col('expr')))

It will fail as expected because expr function expects a string as parameter.

Another idea I thought of using is making a UDF and passing the 'expr' column's value to it, but that will not allow me to use the expr function of pyspark because UDFs are all non-spark code.

What should my approach be? Any suggestions please?

like image 206
UtkarshSahu Avatar asked Jun 19 '20 21:06

UtkarshSahu


People also ask

How do you assign a value to a column in PySpark?

You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.

How do I copy values from one column to another in PySpark?

By using PySpark SQL function regexp_replace() you can replace a column value with a string for another string/substring.

How do you apply a function to a column in PySpark?

You can use reduce , for loops, or list comprehensions to apply PySpark functions to multiple columns in a DataFrame. Using iterators to apply the same operation on multiple columns is vital for maintaining a DRY codebase.

What is AGG function in Spark?

agg(Column expr, scala.collection.Seq<Column> exprs) Compute aggregates by specifying a series of aggregate columns.


1 Answers

So here's a PySpark solution without a UDF. In Scala I believe you could use map or foldleft with the same logic.

exprs = df.select('expr').distinct().collect()[0][0]

for ex in exprs:
    df = df.withColumn('test', when(col('expr') == lit(ex), expr(ex)))
    
df.show()
+--------------------+----+----+----+
|                expr|var1|var2|test|
+--------------------+----+----+----+
|            var1 > 7|   9|   0|true|
|            var1 > 7|   9|   0|true|
|            var1 > 7|   9|   0|true|
|            var1 > 7|   9|   0|true|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 2 AND var2...|   9|   0|null|
+--------------------+----+----+----+

I should point out that I don't understand why the OP wants to do this, if they provide better context to the problem I'm sure there's a better way.

Iterating over a DF isn't the most efficient thing to do, but in this case it will actually work very fast as it doesn't iterate over the data so Spark will actually execute it within one plan. Also a single collect() only adds 2 seconds to the execution time on a 20+ million DF.


UPDATE:

I understand the problem a bit better now, this will be faster as Spark will calculate all of the filters at once before coalescing them into one column.

# Tip: perform the collect statement on the smaller DF that contains the filter expressions

exprs = df.select('expr').distinct().collect()[0][0]

df = df.withColumn('filter',
              coalesce(*[when(col('expr') == lit(ex), expr(ex)) for ex in exprs])
             )
df.show()
+--------------------+----+----+------+
|                expr|var1|var2|filter|
+--------------------+----+----+------+
|            var1 > 7|   9|   0|true  |
|            var1 > 7|   9|   0|true  |
|            var1 > 7|   9|   0|true  |
|            var1 > 7|   9|   0|true  |
|var1 = 3 AND var2...|   9|   0|null  |
|var1 = 3 AND var2...|   9|   0|null  |
|var1 = 3 AND var2...|   9|   0|null  |
|var1 = 3 AND var2...|   9|   0|null  |
|var1 = 2 AND var2...|   9|   0|null  |
+--------------------+----+----+------+
like image 131
Topde Avatar answered Sep 28 '22 07:09

Topde