Suppose that we have a pyspark dataframe that one of its columns (column_a
) contains some string values, and also there is a list of strings (list_a
).
Dataframe:
column_a | count
some_string | 10
another_one | 20
third_string | 30
list_a:
['string', 'third', ...]
I want to filter this dataframe and only keep the rows if column_a's value contains one of list_a's items.
This is the code that works to filter the column_a
based on a single string:
df['column_a'].like('%string_value%')
But how can we get the same result for a list of strings? (Keep the rows that column_a's value is 'string', 'third', ...)
In Spark & PySpark, contains() function is used to match a column value contains in a literal string (matches on part of the string), this is mostly used to filter rows on DataFrame.
PySpark filter() function is used to filter the rows from RDD/DataFrame based on the given condition or SQL expression, you can also use where() clause instead of the filter() if you are coming from an SQL background, both these functions operate exactly the same.
Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.
The Spark where() function is defined to filter rows from the DataFrame or the Dataset based on the given one or multiple conditions or SQL expression. The where() operator can be used instead of the filter when the user has the SQL background. Both the where() and filter() functions operate precisely the same.
IIUC, you want to return the rows in which column_a
is "like" (in the SQL sense) any of the values in list_a
.
One way is to use functools.reduce
:
from functools import reduce
list_a = ['string', 'third']
df1 = df.where(
reduce(lambda a, b: a|b, (df['column_a'].like('%'+pat+"%") for pat in list_a))
)
df1.show()
#+------------+-----+
#| column_a|count|
#+------------+-----+
#| some_string| 10|
#|third_string| 30|
#+------------+-----+
Essentially you loop over all of the possible strings in list_a
to compare in like
and "OR" the results. Here is the execution plan:
df1.explain()
#== Physical Plan ==
#*(1) Filter (Contains(column_a#0, string) || Contains(column_a#0, third))
#+- Scan ExistingRDD[column_a#0,count#1]
Another option is to use pyspark.sql.Column.rlike
instead of like
.
df2 = df.where(
df['column_a'].rlike("|".join(["(" + pat + ")" for pat in list_a]))
)
df2.show()
#+------------+-----+
#| column_a|count|
#+------------+-----+
#| some_string| 10|
#|third_string| 30|
#+------------+-----+
Which has the corresponding execution plan:
df2.explain()
#== Physical Plan ==
#*(1) Filter (isnotnull(column_a#0) && column_a#0 RLIKE (string)|(third))
#+- Scan ExistingRDD[column_a#0,count#1]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With