The following snippet works fine in Spark 2.2.1 but gives a rather cryptic runtime exception in Spark 2.3.0:
import sparkSession.implicits._
import org.apache.spark.sql.functions._
case class X(xid: Long, yid: Int)
case class Y(yid: Int, zid: Long)
case class Z(zid: Long, b: Boolean)
val xs = Seq(X(1L, 10)).toDS()
val ys = Seq(Y(10, 100L)).toDS()
val zs = Seq.empty[Z].toDS()
val j = xs
.join(ys, "yid")
.join(zs, Seq("zid"), "left")
.withColumn("BAM", when('b, "B").otherwise("NB"))
j.show()
In Spark 2.2.1 it prints to the console
+---+---+---+----+---+
|zid|yid|xid| b|BAM|
+---+---+---+----+---+
|100| 10| 1|null| NB|
+---+---+---+----+---+
In Spark 2.3.0 it results in:
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'BAM
at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435)
at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
...
The culprit really seems to be Dataset
being created from an empty Seq[Z]
. When you change that to something that will also result in an empty Dataset[Z]
it works as in Spark 2.2.1, e.g.
val zs = Seq(Z(10L, true)).toDS().filter('zid === 999L)
In the migration guide from 2.2 to 2.3 is mentioned:
Since Spark 2.3, the Join/Filter’s deterministic predicates that are after the first non-deterministic predicates are also pushed down/through the child operators, if possible. In prior Spark versions, these filters are not eligible for predicate pushdown.
Is this related, or a (known) bug?
The core syntax for reading data in Apache Spark format — specifies the file format as in CSV, JSON, or parquet. The default is parquet. option — a set of key-value configurations to parameterize how to read data. schema — optional one used to specify if you would like to infer the schema from the data source.
I have worked this around on 2.3.0 by issuing someEmptyDataset.cache()
right after the empty Dataset
was created. OP's example didn't fail anymore like that (with zs.cache()
), also the actual problem at work went away with this trick.
(As a side note, OP-s code doesn't fail for me on Spark 2.3.2 run locally. Though I don't see the related fix in 2.3.2 change log, so it's maybe due to some other differences in the environment...)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With