Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Filter pyspark dataframe if contains a list of strings

Suppose that we have a pyspark dataframe that one of its columns (column_a) contains some string values, and also there is a list of strings (list_a).

Dataframe:

column_a      | count
some_string   |  10
another_one   |  20
third_string  |  30

list_a:

['string', 'third', ...]

I want to filter this dataframe and only keep the rows if column_a's value contains one of list_a's items.

This is the code that works to filter the column_a based on a single string:

df['column_a'].like('%string_value%')

But how can we get the same result for a list of strings? (Keep the rows that column_a's value is 'string', 'third', ...)

like image 560
Saeed Esmaili Avatar asked Oct 01 '19 13:10

Saeed Esmaili


People also ask

How do you Filter strings in PySpark?

In Spark & PySpark, contains() function is used to match a column value contains in a literal string (matches on part of the string), this is mostly used to filter rows on DataFrame.

How do you Filter data from a DataFrame PySpark?

PySpark filter() function is used to filter the rows from RDD/DataFrame based on the given condition or SQL expression, you can also use where() clause instead of the filter() if you are coming from an SQL background, both these functions operate exactly the same.

What does .collect do in PySpark?

Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.

How do you Filter records from a DataFrame in Spark?

The Spark where() function is defined to filter rows from the DataFrame or the Dataset based on the given one or multiple conditions or SQL expression. The where() operator can be used instead of the filter when the user has the SQL background. Both the where() and filter() functions operate precisely the same.


1 Answers

IIUC, you want to return the rows in which column_a is "like" (in the SQL sense) any of the values in list_a.

One way is to use functools.reduce:

from functools import reduce

list_a = ['string', 'third']

df1 = df.where(
    reduce(lambda a, b: a|b, (df['column_a'].like('%'+pat+"%") for pat in list_a))
)
df1.show()
#+------------+-----+
#|    column_a|count|
#+------------+-----+
#| some_string|   10|
#|third_string|   30|
#+------------+-----+

Essentially you loop over all of the possible strings in list_a to compare in like and "OR" the results. Here is the execution plan:

df1.explain()
#== Physical Plan ==
#*(1) Filter (Contains(column_a#0, string) || Contains(column_a#0, third))
#+- Scan ExistingRDD[column_a#0,count#1]

Another option is to use pyspark.sql.Column.rlike instead of like.

df2 = df.where(
    df['column_a'].rlike("|".join(["(" + pat + ")" for pat in list_a]))
)

df2.show()
#+------------+-----+
#|    column_a|count|
#+------------+-----+
#| some_string|   10|
#|third_string|   30|
#+------------+-----+

Which has the corresponding execution plan:

df2.explain()
#== Physical Plan ==
#*(1) Filter (isnotnull(column_a#0) && column_a#0 RLIKE (string)|(third))
#+- Scan ExistingRDD[column_a#0,count#1]
like image 50
pault Avatar answered Oct 11 '22 07:10

pault