My problem is as follows:
I have a large dataframe called details
containing 900K rows and the other one containing 80M rows named attributes
.
Both have a column A
on which I would like to do a left-outer join, the left dataframe being deatils
.
There are only 75K unique entries in column A
in the dataframe details
. The dataframe attributes
80M unique entries in column A
.
What is the best possible way to achieve the join
operation?
What have I tried?
The simple join i.e. details.join(attributes, "A", how="left_outer")
just times out (or gives out of memory).
Since there are only 75K unique entries in column A
in details
, we don't care about the rest in the dataframe in attributes
. So, first I filter that using:
uniqueA = details.select('A').distinct().collect()
uniqueA = map(lambda x: x.A, uniqueA)
attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA))
I thought this would work out because the attributes
table comes down from 80M rows to mere 75K rows. However, it still takes forever to complete the join
(and it never completes).
Next, I thought that there are too many partitions and the data to be joined is not on the same partition. Though, I don't know how to bring all the data to the same partition, I figured repartitioning may help. So here it goes.
details_repartitioned = details.repartition("A")
attributes_repartitioned = attributes.repartition("A")
The above operation brings down the number of partitions in attributes
from 70K to 200. The number of partitions in details
are about 1100.
details_attributes = details_repartitioned.join(broadcast(
attributes_repartitioned), "A", how='left_outer') # tried without broadcast too
After all this, the join
still doesn't work. I am still learning PySpark so I might have misunderstood the fundamentals behind repartitioning. If someone could shed light on this, it would be great.
P.S. I have already seen this question but that does not answer this question.
Details table has 900k items with 75k distinct entries in column A. I think the filter on the column A you have tried is a correct direction. However, the collect and followed by the map operation
attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA))
this is too expensive. An alternate approach would be
uniqueA = details.select('A').distinct().persist(StorageLevel.DISK_ONLY)
uniqueA.count // Breaking the DAG lineage
attrJoined = attributes.join(uniqueA, "inner")
Also, you probably need to set the shuffle partition correctly if you haven't done that yet.
One problem could happen in your dataset is that skew. It could happen among 75k unique values only a few joining with a large number of rows in the attribute table. In that case join could take much longer time and may not finish.
To resolve that you need to find the skewed values of column A and process them separately.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With