I'm trying to read the messages from kafka (version 10) in spark and trying to print it.
import spark.implicits._ val spark = SparkSession .builder .appName("StructuredNetworkWordCount") .config("spark.master", "local") .getOrCreate() val ds1 = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "topicA") .load() ds1.collect.foreach(println) ds1.writeStream .format("console") .start() ds1.printSchema()
getting an error Exception in thread "main"
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
You are branching the query plan: from the same ds1 you are trying to:
ds1.collect.foreach(...)
ds1.writeStream.format(...){...}
But you are only calling .start()
on the second branch, leaving the other dangling without a termination, which in turn throws the exception you are getting back.
The solution is to start both branches and await termination.
val ds1 = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "topicA") .load() val query1 = ds1.collect.foreach(println) .writeStream .format("console") .start() val query2 = ds1.writeStream .format("console") .start() ds1.printSchema() query1.awaitTermination() query2.awaitTermination()
I struggled a lot with this issue. I tried each of suggested solution from various blog. But I my case there are few statement in between calling start() on query and finally at last i was calling awaitTerminate() function that cause this.
Please try in this fashion, It is perfectly working for me. Working example:
val query = df.writeStream .outputMode("append") .format("console") .start().awaitTermination();
If you write in this way that will cause exception/ error:
val query = df.writeStream .outputMode("append") .format("console") .start() // some statement // some statement query.awaitTermination();
will throw given exception and will close your streaming driver.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With