Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

UnresolvedException: Invalid call to dataType on unresolved object when using DataSet constructed from Seq.empty (since Spark 2.3.0)

The following snippet works fine in Spark 2.2.1 but gives a rather cryptic runtime exception in Spark 2.3.0:

import sparkSession.implicits._
import org.apache.spark.sql.functions._

case class X(xid: Long, yid: Int)
case class Y(yid: Int, zid: Long)
case class Z(zid: Long, b: Boolean)

val xs = Seq(X(1L, 10)).toDS()
val ys = Seq(Y(10, 100L)).toDS()
val zs = Seq.empty[Z].toDS()

val j = xs
  .join(ys, "yid")
  .join(zs, Seq("zid"), "left")
  .withColumn("BAM", when('b, "B").otherwise("NB"))

j.show()

In Spark 2.2.1 it prints to the console

+---+---+---+----+---+
|zid|yid|xid|   b|BAM|
+---+---+---+----+---+
|100| 10|  1|null| NB|
+---+---+---+----+---+

In Spark 2.3.0 it results in:

org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'BAM
  at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
  at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
  at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:296)
  at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
  ...

The culprit really seems to be Dataset being created from an empty Seq[Z]. When you change that to something that will also result in an empty Dataset[Z] it works as in Spark 2.2.1, e.g.

val zs = Seq(Z(10L, true)).toDS().filter('zid === 999L)

In the migration guide from 2.2 to 2.3 is mentioned:

Since Spark 2.3, the Join/Filter’s deterministic predicates that are after the first non-deterministic predicates are also pushed down/through the child operators, if possible. In prior Spark versions, these filters are not eligible for predicate pushdown.

Is this related, or a (known) bug?

like image 520
Sam De Backer Avatar asked Apr 10 '18 15:04

Sam De Backer


People also ask

What is Spark read format?

The core syntax for reading data in Apache Spark format — specifies the file format as in CSV, JSON, or parquet. The default is parquet. option — a set of key-value configurations to parameterize how to read data. schema — optional one used to specify if you would like to infer the schema from the data source.


1 Answers

I have worked this around on 2.3.0 by issuing someEmptyDataset.cache() right after the empty Dataset was created. OP's example didn't fail anymore like that (with zs.cache()), also the actual problem at work went away with this trick.

(As a side note, OP-s code doesn't fail for me on Spark 2.3.2 run locally. Though I don't see the related fix in 2.3.2 change log, so it's maybe due to some other differences in the environment...)

like image 102
ddekany Avatar answered Sep 20 '22 12:09

ddekany