I want to join data twice as below:
rdd1 = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['idx', 'val'])
rdd2 = spark.createDataFrame([(1, 2, 1), (1, 3, 0), (2, 3, 1)], ['key1', 'key2', 'val'])
res1 = rdd1.join(rdd2, on=[rdd1['idx'] == rdd2['key1']])
res2 = res1.join(rdd1, on=[res1['key2'] == rdd1['idx']])
res2.show()
Then I get some error :
pyspark.sql.utils.AnalysisException: u'Cartesian joins could be prohibitively expensive and are disabled by default. To explicitly enable them, please set spark.sql.crossJoin.enabled = true;'
But I think this is not a cross join
UPDATE:
res2.explain()
== Physical Plan ==
CartesianProduct
:- *SortMergeJoin [idx#0L, idx#0L], [key1#5L, key2#6L], Inner
: :- *Sort [idx#0L ASC, idx#0L ASC], false, 0
: : +- Exchange hashpartitioning(idx#0L, idx#0L, 200)
: : +- *Filter isnotnull(idx#0L)
: : +- Scan ExistingRDD[idx#0L,val#1]
: +- *Sort [key1#5L ASC, key2#6L ASC], false, 0
: +- Exchange hashpartitioning(key1#5L, key2#6L, 200)
: +- *Filter ((isnotnull(key2#6L) && (key2#6L = key1#5L)) && isnotnull(key1#5L))
: +- Scan ExistingRDD[key1#5L,key2#6L,val#7L]
+- Scan ExistingRDD[idx#40L,val#41]
To avoid Cartesian product, a SQL query that joins N tables must have N-1 join conditions. Join condition is missing or trivial. Into each of your Spark driver application code.
In Spark, the Cartesian function generates a Cartesian product of two datasets and returns all the possible combination of pairs. Here, each element of one dataset is paired with each element of another dataset.
CARTESIAN JOIN: The CARTESIAN JOIN is also known as CROSS JOIN. In a CARTESIAN JOIN there is a join for each row of one table to every row of another table. This usually happens when the matching column or WHERE condition is not specified.
To avoid a Cartesian product, you must specify how the tables should be combined. Typically, you want to pair rows based on matching values in one or more key columns of each table.
This happens because you join
structures sharing the same lineage and this leads to a trivially equal condition:
res2.explain()
== Physical Plan ==
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Join Inner, ((idx#204L = key1#209L) && (key2#210L = idx#204L))
:- Filter isnotnull(idx#204L)
: +- LogicalRDD [idx#204L, val#205]
+- Filter ((isnotnull(key2#210L) && (key2#210L = key1#209L)) && isnotnull(key1#209L))
+- LogicalRDD [key1#209L, key2#210L, val#211L]
and
LogicalRDD [idx#235L, val#236]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
In case like this you should use aliases:
from pyspark.sql.functions import col
rdd1 = spark.createDataFrame(...).alias('rdd1')
rdd2 = spark.createDataFrame(...).alias('rdd2')
res1 = rdd1.join(rdd2, col('rdd1.idx') == col('rdd2.key1')).alias('res1')
res1.join(rdd1, on=col('res1.key2') == col('rdd1.idx')).explain()
== Physical Plan ==
*SortMergeJoin [key2#297L], [idx#360L], Inner
:- *Sort [key2#297L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key2#297L, 200)
: +- *SortMergeJoin [idx#290L], [key1#296L], Inner
: :- *Sort [idx#290L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(idx#290L, 200)
: : +- *Filter isnotnull(idx#290L)
: : +- Scan ExistingRDD[idx#290L,val#291]
: +- *Sort [key1#296L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key1#296L, 200)
: +- *Filter (isnotnull(key2#297L) && isnotnull(key1#296L))
: +- Scan ExistingRDD[key1#296L,key2#297L,val#298L]
+- *Sort [idx#360L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(idx#360L, 200)
+- *Filter isnotnull(idx#360L)
+- Scan ExistingRDD[idx#360L,val#361]
For details see SPARK-6459.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With