Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AnalysisException: Queries with streaming sources must be executed with writeStream.start()

I'm receiving an exception which indicates that I need to start a stream in order to use it. However, the stream is being started. What's wrong with this setup?

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .option("subscribe", "inputTopic")
  .option("startingOffsets", "earliest")
  .load
  .selectExpr(deserializeKeyExpression, deserializeValueExpression)
  .select("value.*")
  .withColumn("date", to_timestamp(from_unixtime(col("date"))))
  .transform(model.transform)
  .select(col("id") as "key", func(col(config.probabilityCol)) as "value.prediction")
  .selectExpr(serializeKeyExpression, serializeValueExpression)
  .writeStream
  .outputMode("update")
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .option("checkpointLocation", "checkpoint")
  .option("topic", "outputTopic")
  .start

Here's the exception:

Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    ...
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
    at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
    at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
    at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3249)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2491)
    at org.apache.spark.sql.Dataset.first(Dataset.scala:2498)
    at org.apache.spark.ml.feature.VectorAssembler.first$lzycompute$1(VectorAssembler.scala:57)
    at org.apache.spark.ml.feature.VectorAssembler.org$apache$spark$ml$feature$VectorAssembler$$first$1(VectorAssembler.scala:57)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply$mcI$sp(VectorAssembler.scala:88)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2$$anonfun$1.apply(VectorAssembler.scala:88)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:88)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:58)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
    at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:58)
    at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:306)
    at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:306)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
    at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
    at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:306)
    at com.company.project.Stream$$anonfun$transform$1.apply(NewsRateJob.scala:65)
    at com.company.project.Stream$$anonfun$transform$1.apply(NewsRateJob.scala:65)
    at org.apache.spark.sql.Dataset.transform(Dataset.scala:2513)
    at com.company.project.Stream.transform(NewsRateJob.scala:65)
    at com.company.project.Stream.setupStream(NewsRateJob.scala:47)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:366)
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:311)
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:134)
    ... 18 common frames omitted

I'm familiar with the issues of spark 2.2 and the VectorAssembler, however I am using spark 2.3.1.

like image 880
cscan Avatar asked Jul 06 '18 00:07

cscan


1 Answers

This exception occurred because the model was trying to access the data from the stream before the stream was started. In this case, the VectorAssembler was calling first on the dataset in order to determine how wide the vector is.

2.3 does not automatically fix problems with the VectorAssembler and structured streaming, it simply provides a class (specifically the VectorSizeHint class) which can be used in conjunction with the VectorAssembler with structured streaming. Adding this to the stages of the pipeline fixed the problem.

stages += new VectorSizeHint()
  .setInputCol(column)
  .setSize(size)

Here's some documentation which shows how it can be used: https://docs.databricks.com/spark/latest/mllib/mllib-pipelines-and-stuctured-streaming.html

Note: this is not required for the OneHotEncoderEstimator feature.

We experienced similar stack traces for a couple other reasons. One was caused because we were using a OneHotEstimator in our model (that needed to be updated to a OneHotEncoderEstimator), the other was caused because we were caching the pipeline (we removed the cache step).

like image 119
cscan Avatar answered Sep 30 '22 13:09

cscan