What are general best-practices to filtering a dataframe in pyspark by a given list of values? Specifically:
Depending on the size of the given list of values, then with respect to runtime when is it best to use isin
vs inner join
vs broadcast
?
This question is the spark analogue of the following question in Pig:
Pig: efficient filtering by loaded list
Additional context:
Pyspark isin function
Considering
import pyspark.sql.functions as psf
There are two types of broadcasting:
sc.broadcast()
to copy python objects to every node for a more efficient use of psf.isin
psf.broadcast
inside a join
to copy your pyspark dataframe to every node when the dataframe is small: df1.join(psf.broadcast(df2))
. It is usually used for cartesian products (CROSS JOIN in pig).In the context question, the filtering was done using the column of another dataframe, hence the possible solution with a join.
Keep in mind that if your filtering list is relatively big the operation of searching through it will take a while, and since it has do be done for each row it can quickly get costly.
Joins on the other hand involve two dataframes that will be sorted before matching, so if your list is small enough you might not want to have to sort a huge dataframe just for a filter.
Both join and isin works well for all my daily workcases.
Note - If you have a large dataset (say ~500 GB) and you want to do filtering and then processing of filtered dataset, then
using isin the data read/processing is significantly very low and Fast. Whole 500 GB will not be loaded as you have already filtered the smaller dataset from .isin method.
But for the Join case, whole 500GB will loaded and processing. So Time of Processing will be much higher.
My case, After filtering using
isin, and then processing and converting to Pandas DF. It took < 60 secs
with JOIN and then processing and converting to Pandas DF. It takes > 1 hours.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With