Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark 2.0.0 Error: PartitioningCollection requires all of its partitionings have the same numPartitions

I'm joining some DataFrames together in Spark and I keep getting the following error:

PartitioningCollection requires all of its partitionings have the same numPartitions.

It seems to happen after I join two DataFrames together that each seem fairly reasonable on their own, but after joining them, if I try to get a row from the joined DataFrame, I get this error. I am really just trying to understand why this error might be appearing or what the meaning behind it is as I can't seem to find any documentation on it.

The following invocation results in this exception:

val resultDataframe = dataFrame1
  .join(dataFrame2,     
    $"first_column" === $"second_column").take(2)

but I can certainly call

dataFrame1.take(2)

and

dataFrame2.take(2)

I also tried repartitioning the DataFrames, using Dataset.repartition(numPartitions) or Dataset.coalesce(numParitions) on dataFrame1 and dataFrame2 before joining, and on resultDataFrame after the join, but nothing seemed to have affected the error. I haven't been able to find reference to other individuals getting the error after some cursory googling...

like image 631
Clemente Cuevas Avatar asked Sep 29 '16 22:09

Clemente Cuevas


4 Answers

I've also had the same problem. For me it occurred after removing some columns from the select part of a join (not the join clause itself).

I was able to fix it by calling .repartition() on the dataframe.

like image 112
Nick Lothian Avatar answered Nov 20 '22 11:11

Nick Lothian


This problem is about ReorderJoinPredicates fixed in Spark 2.3.0

like image 42
seaman29 Avatar answered Nov 20 '22 11:11

seaman29


I have encountered the same issue in the last few days, and I was disappointed when I found no references on the internet. Until yours!

A couple of things I would add: I get the error after a pretty complicated set of operations on dataframes (multiple joins). Also, these operations involve dataframes that are generated from the same parent dataframe. I'm trying to have a minimal example to replicate it, but it's not trivial to extract it from my pipeline.

I suspect Spark might be having troubles in computing a correct plan when the DAG gets too complicated. Unfortunately, it seems that, if it is a bug in Spark 2.0.0, the nightly builds have not fixed it yet (I've tried a 2.0.2 snapshot a couple of days ago).

A practical solution that fixes the issue (temporarily) seems to be: write to disk (at some point) some of your dataframes in your pipeline, and read them again. This effectively forces Spark to have a much smaller, more manageable plan to optimize, and well, it doesn't crash anymore. Of course it's just a temporary fix.

like image 9
L.T. Avatar answered Nov 20 '22 11:11

L.T.


Do you call the cache method?

This problem happens to me only when I use cache method. If I don't call this method I can use the data without any problem.

like image 3
Luis A.G. Avatar answered Nov 20 '22 11:11

Luis A.G.