Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Filter Pyspark Dataframe with udf on entire row

Is there a way to select the entire row as a column to input into a Pyspark filter udf?

I have a complex filtering function "my_filter" that I want to apply to the entire DataFrame:

my_filter_udf = udf(lambda r: my_filter(r), BooleanType())
new_df = df.filter(my_filter_udf(col("*"))

But

col("*")

throws an error because that's not a valid operation.

I know that I can convert the dataframe to an RDD and then use the RDD's filter method, but I do NOT want to convert it to an RDD and then back into a dataframe. My DataFrame has complex nested types, so the schema inference fails when I try to convert the RDD into a dataframe again.

like image 495
user2399973 Avatar asked Dec 11 '22 05:12

user2399973


1 Answers

You should write all columns staticly. For example:

from pyspark.sql import functions as F

# create sample df
df = sc.parallelize([
     (1, 'b'),
     (1, 'c'),

 ]).toDF(["id", "category"])

#simple filter function
@F.udf(returnType=BooleanType())
def my_filter(col1, col2):
    return (col1>0) & (col2=="b")

df.filter(my_filter('id', 'category')).show()

Results:

+---+--------+
| id|category|
+---+--------+
|  1|       b|
+---+--------+

If you have so many columns and you are sure to order of columns:

cols = df.columns
df.filter(my_filter(*cols)).show()

Yields the same output.

like image 138
hamza tuna Avatar answered Jan 03 '23 04:01

hamza tuna