I'm joining some DataFrames together in Spark and I keep getting the following error:
PartitioningCollection requires all of its partitionings have the same numPartitions.
It seems to happen after I join two DataFrames together that each seem fairly reasonable on their own, but after joining them, if I try to get a row from the joined DataFrame, I get this error. I am really just trying to understand why this error might be appearing or what the meaning behind it is as I can't seem to find any documentation on it.
The following invocation results in this exception:
val resultDataframe = dataFrame1
.join(dataFrame2,
$"first_column" === $"second_column").take(2)
but I can certainly call
dataFrame1.take(2)
and
dataFrame2.take(2)
I also tried repartitioning the DataFrames
, using Dataset.repartition(numPartitions)
or Dataset.coalesce(numParitions)
on dataFrame1
and dataFrame2
before joining, and on resultDataFrame
after the join, but nothing seemed to have affected the error. I haven't been able to find reference to other individuals getting the error after some cursory googling...
I've also had the same problem. For me it occurred after removing some columns from the select part of a join (not the join clause itself).
I was able to fix it by calling .repartition()
on the dataframe.
This problem is about ReorderJoinPredicates
fixed in Spark 2.3.0
I have encountered the same issue in the last few days, and I was disappointed when I found no references on the internet. Until yours!
A couple of things I would add: I get the error after a pretty complicated set of operations on dataframes (multiple joins). Also, these operations involve dataframes that are generated from the same parent dataframe. I'm trying to have a minimal example to replicate it, but it's not trivial to extract it from my pipeline.
I suspect Spark might be having troubles in computing a correct plan when the DAG gets too complicated. Unfortunately, it seems that, if it is a bug in Spark 2.0.0, the nightly builds have not fixed it yet (I've tried a 2.0.2 snapshot a couple of days ago).
A practical solution that fixes the issue (temporarily) seems to be: write to disk (at some point) some of your dataframes in your pipeline, and read them again. This effectively forces Spark to have a much smaller, more manageable plan to optimize, and well, it doesn't crash anymore. Of course it's just a temporary fix.
Do you call the cache method?
This problem happens to me only when I use cache method. If I don't call this method I can use the data without any problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With