Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Queries with streaming sources must be executed with writeStream.start();

I'm trying to read the messages from kafka (version 10) in spark and trying to print it.

     import spark.implicits._           val spark = SparkSession               .builder               .appName("StructuredNetworkWordCount")               .config("spark.master", "local")               .getOrCreate()                val ds1 = spark.readStream.format("kafka")               .option("kafka.bootstrap.servers", "localhost:9092")                 .option("subscribe", "topicA")               .load()             ds1.collect.foreach(println)            ds1.writeStream            .format("console")            .start()             ds1.printSchema() 

getting an error Exception in thread "main"

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

like image 300
shivali Avatar asked Nov 15 '16 12:11

shivali


2 Answers

You are branching the query plan: from the same ds1 you are trying to:

  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}

But you are only calling .start() on the second branch, leaving the other dangling without a termination, which in turn throws the exception you are getting back.

The solution is to start both branches and await termination.

val ds1 = spark.readStream.format("kafka")   .option("kafka.bootstrap.servers", "localhost:9092")     .option("subscribe", "topicA")     .load() val query1 = ds1.collect.foreach(println)   .writeStream   .format("console")   .start() val query2 = ds1.writeStream   .format("console")   .start()  ds1.printSchema() query1.awaitTermination() query2.awaitTermination() 
like image 177
ssice Avatar answered Sep 20 '22 04:09

ssice


I struggled a lot with this issue. I tried each of suggested solution from various blog. But I my case there are few statement in between calling start() on query and finally at last i was calling awaitTerminate() function that cause this.

Please try in this fashion, It is perfectly working for me. Working example:

val query = df.writeStream       .outputMode("append")       .format("console")       .start().awaitTermination(); 

If you write in this way that will cause exception/ error:

val query = df.writeStream       .outputMode("append")       .format("console")       .start()      // some statement      // some statement       query.awaitTermination(); 

will throw given exception and will close your streaming driver.

like image 37
Rajeev Rathor Avatar answered Sep 17 '22 04:09

Rajeev Rathor