I'm receiving an exception which indicates that I need to start a stream in order to use it. However, the stream is being started. What's wrong with this setup?
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("subscribe", "inputTopic")
.option("startingOffsets", "earliest")
.load
.selectExpr(deserializeKeyExpression, deserializeValueExpression)
.select("value.*")
.withColumn("date", to_timestamp(from_unixtime(col("date"))))
.transform(model.transform)
.select(col("id") as "key", func(col(config.probabilityCol)) as "value.prediction")
.selectExpr(serializeKeyExpression, serializeValueExpression)
.writeStream
.outputMode("update")
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServers)
.option("checkpointLocation", "checkpoint")
.option("topic", "outputTopic")
.start
Here's the exception:
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
...
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3249)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2491)
at org.apache.spark.sql.Dataset.first(Dataset.scala:2498)
at org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
at org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:306)
at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:306)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:306)
at com.company.project.Stream$$anonfun$transform$1.apply(NewsRateJob.scala:65)
at com.company.project.Stream$$anonfun$transform$1.apply(NewsRateJob.scala:65)
at org.apache.spark.sql.Dataset.transform(Dataset.scala:2513)
at com.company.project.Stream.transform(NewsRateJob.scala:65)
at com.company.project.Stream.setupStream(NewsRateJob.scala:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:366)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:311)
at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:134)
... 18 common frames omitted
I'm familiar with the issues of spark 2.2 and the VectorAssembler, however I am using spark 2.3.1.
This exception occurred because the model was trying to access the data from the stream before the stream was started. In this case, the VectorAssembler was calling first
on the dataset in order to determine how wide the vector is.
2.3 does not automatically fix problems with the VectorAssembler and structured streaming, it simply provides a class (specifically the VectorSizeHint class) which can be used in conjunction with the VectorAssembler with structured streaming. Adding this to the stages of the pipeline fixed the problem.
stages += new VectorSizeHint()
.setInputCol(column)
.setSize(size)
Here's some documentation which shows how it can be used: https://docs.databricks.com/spark/latest/mllib/mllib-pipelines-and-stuctured-streaming.html
Note: this is not required for the OneHotEncoderEstimator feature.
We experienced similar stack traces for a couple other reasons. One was caused because we were using a OneHotEstimator in our model (that needed to be updated to a OneHotEncoderEstimator), the other was caused because we were caching the pipeline (we removed the cache step).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With