Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Joining a large and a massive spark dataframe

My problem is as follows:

  • I have a large dataframe called details containing 900K rows and the other one containing 80M rows named attributes.

  • Both have a column A on which I would like to do a left-outer join, the left dataframe being deatils.

  • There are only 75K unique entries in column A in the dataframe details. The dataframe attributes 80M unique entries in column A.

What is the best possible way to achieve the join operation?

What have I tried?

  • The simple join i.e. details.join(attributes, "A", how="left_outer") just times out (or gives out of memory).

  • Since there are only 75K unique entries in column A in details, we don't care about the rest in the dataframe in attributes. So, first I filter that using:

    uniqueA = details.select('A').distinct().collect()
    uniqueA = map(lambda x: x.A, uniqueA)
    attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA))
    

    I thought this would work out because the attributes table comes down from 80M rows to mere 75K rows. However, it still takes forever to complete the join (and it never completes).

  • Next, I thought that there are too many partitions and the data to be joined is not on the same partition. Though, I don't know how to bring all the data to the same partition, I figured repartitioning may help. So here it goes.

    details_repartitioned = details.repartition("A")
    attributes_repartitioned = attributes.repartition("A")
    
  • The above operation brings down the number of partitions in attributes from 70K to 200. The number of partitions in details are about 1100.

    details_attributes = details_repartitioned.join(broadcast(
    attributes_repartitioned), "A", how='left_outer')  # tried without broadcast too
    

After all this, the join still doesn't work. I am still learning PySpark so I might have misunderstood the fundamentals behind repartitioning. If someone could shed light on this, it would be great.

P.S. I have already seen this question but that does not answer this question.

like image 860
Autonomous Avatar asked Apr 20 '18 02:04

Autonomous


1 Answers

Details table has 900k items with 75k distinct entries in column A. I think the filter on the column A you have tried is a correct direction. However, the collect and followed by the map operation

attributes_filtered = attributes.filter(attributes.A.isin(*uniqueA)) 

this is too expensive. An alternate approach would be

uniqueA = details.select('A').distinct().persist(StorageLevel.DISK_ONLY)
uniqueA.count // Breaking the DAG lineage
attrJoined = attributes.join(uniqueA, "inner")

Also, you probably need to set the shuffle partition correctly if you haven't done that yet.

One problem could happen in your dataset is that skew. It could happen among 75k unique values only a few joining with a large number of rows in the attribute table. In that case join could take much longer time and may not finish.

To resolve that you need to find the skewed values of column A and process them separately.

like image 106
Avishek Bhattacharya Avatar answered Nov 13 '22 09:11

Avishek Bhattacharya