Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Spark fail with "Detected cartesian product for INNER join between logical plans"?

I am using Spark 2.1.0.

When I execute the following code I'm getting an error from Spark. Why? How to fix it?

val i1 = Seq(("a", "string"), ("another", "string"), ("last", "one")).toDF("a", "b") val i2 = Seq(("one", "string"), ("two", "strings")).toDF("a", "b") val i1Idx = i1.withColumn("sourceId", lit(1)) val i2Idx = i2.withColumn("sourceId", lit(2)) val input = i1Idx.union(i2Idx) val weights = Seq((1, 0.6), (2, 0.4)).toDF("sourceId", "weight") weights.join(input, "sourceId").show 

Error:

scala> weights.join(input, "sourceId").show org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans Project [_1#34 AS sourceId#39, _2#35 AS weight#40] +- Filter (((1 <=> _1#34) || (2 <=> _1#34)) && (_1#34 = 1))    +- LocalRelation [_1#34, _2#35] and Union :- Project [_1#0 AS a#5, _2#1 AS b#6] :  +- LocalRelation [_1#0, _2#1] +- Project [_1#10 AS a#15, _2#11 AS b#16]    +- LocalRelation [_1#10, _2#11] Join condition is missing or trivial. Use the CROSS JOIN syntax to allow cartesian products between these relations.;   at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$19.applyOrElse(Optimizer.scala:1011)   at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$19.applyOrElse(Optimizer.scala:1008)   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)   at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)   at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)   at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)   at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)   at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1008)   at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:993)   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)   at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)   at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)   at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)   at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)   at scala.collection.immutable.List.foreach(List.scala:381)   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)   at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)   at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)   at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)   at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)   at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2791)   at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)   at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)   at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)   at org.apache.spark.sql.Dataset.show(Dataset.scala:636)   at org.apache.spark.sql.Dataset.show(Dataset.scala:595)   at org.apache.spark.sql.Dataset.show(Dataset.scala:604)   ... 48 elided 
like image 687
Marsellus Wallace Avatar asked Jun 26 '17 14:06

Marsellus Wallace


People also ask

How do you avoid Cartesian join in spark?

I would write this as: select avg(case when status = 'PENDING' then 0.0 else 1 end) from sales; This returns the proportion of rows that are not pending.

What is Cartesian join in spark?

In Spark, the Cartesian function generates a Cartesian product of two datasets and returns all the possible combination of pairs. Here, each element of one dataset is paired with each element of another dataset.

What is spark SQL crossJoin enabled?

enable is true by default (SPARK-28621). Spark >= 2.1. You can use crossJoin : df1.crossJoin(df2) It makes your intention explicit and keeps more conservative configuration in place to protect you from unintended cross joins. Spark 2.0.

What is left outer join in spark?

Left Outer Join Spark Left a.k.a Left Outer join returns all rows from the left DataFrame/Dataset regardless of match found on the right dataset when join expression doesn't match, it assigns null for that record and drops records from right where match not found.


2 Answers

You can triggers inner join after turning on the flag

spark.conf.set("spark.sql.crossJoin.enabled", "true") 

You also could also use the cross join.

weights.crossJoin(input) 

or set the Alias as

weights.join(input, input("sourceId")===weights("sourceId"), "cross") 

You can find more about the issue SPARK-6459 which is said to be fixed in 2.1.1

As you have already used 2.1.1 the issue should have been fixed.

like image 106
koiralo Avatar answered Oct 20 '22 18:10

koiralo


tl;dr Upgrade to Spark 2.1.1. It's an issue in Spark that was fixed.

(I really wished I could also show you the exact change that fixed that in 2.1.1)

like image 30
Jacek Laskowski Avatar answered Oct 20 '22 18:10

Jacek Laskowski