I am using Spark 2.1.0.
When I execute the following code I'm getting an error from Spark. Why? How to fix it?
val i1 = Seq(("a", "string"), ("another", "string"), ("last", "one")).toDF("a", "b") val i2 = Seq(("one", "string"), ("two", "strings")).toDF("a", "b") val i1Idx = i1.withColumn("sourceId", lit(1)) val i2Idx = i2.withColumn("sourceId", lit(2)) val input = i1Idx.union(i2Idx) val weights = Seq((1, 0.6), (2, 0.4)).toDF("sourceId", "weight") weights.join(input, "sourceId").show
Error:
scala> weights.join(input, "sourceId").show org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans Project [_1#34 AS sourceId#39, _2#35 AS weight#40] +- Filter (((1 <=> _1#34) || (2 <=> _1#34)) && (_1#34 = 1)) +- LocalRelation [_1#34, _2#35] and Union :- Project [_1#0 AS a#5, _2#1 AS b#6] : +- LocalRelation [_1#0, _2#1] +- Project [_1#10 AS a#15, _2#11 AS b#16] +- LocalRelation [_1#10, _2#11] Join condition is missing or trivial. Use the CROSS JOIN syntax to allow cartesian products between these relations.; at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$19.applyOrElse(Optimizer.scala:1011) at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$19.applyOrElse(Optimizer.scala:1008) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277) at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1008) at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:993) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2791) at org.apache.spark.sql.Dataset.head(Dataset.scala:2112) at org.apache.spark.sql.Dataset.take(Dataset.scala:2327) at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) at org.apache.spark.sql.Dataset.show(Dataset.scala:636) at org.apache.spark.sql.Dataset.show(Dataset.scala:595) at org.apache.spark.sql.Dataset.show(Dataset.scala:604) ... 48 elided
I would write this as: select avg(case when status = 'PENDING' then 0.0 else 1 end) from sales; This returns the proportion of rows that are not pending.
In Spark, the Cartesian function generates a Cartesian product of two datasets and returns all the possible combination of pairs. Here, each element of one dataset is paired with each element of another dataset.
enable is true by default (SPARK-28621). Spark >= 2.1. You can use crossJoin : df1.crossJoin(df2) It makes your intention explicit and keeps more conservative configuration in place to protect you from unintended cross joins. Spark 2.0.
Left Outer Join Spark Left a.k.a Left Outer join returns all rows from the left DataFrame/Dataset regardless of match found on the right dataset when join expression doesn't match, it assigns null for that record and drops records from right where match not found.
You can triggers inner join after turning on the flag
spark.conf.set("spark.sql.crossJoin.enabled", "true")
You also could also use the cross join.
weights.crossJoin(input)
or set the Alias as
weights.join(input, input("sourceId")===weights("sourceId"), "cross")
You can find more about the issue SPARK-6459 which is said to be fixed in 2.1.1
As you have already used 2.1.1 the issue should have been fixed.
tl;dr Upgrade to Spark 2.1.1. It's an issue in Spark that was fixed.
(I really wished I could also show you the exact change that fixed that in 2.1.1)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With