Trying to create a test for spark data streaming writeStream function as shown below:
SparkSession spark = SparkSession.builder().master("local").appName("spark
session").getOrCreate()
val lakeDF = spark.createDF(List(("hi")), List(("word", StringType, true)))
lakeDF.writeStream
.trigger(Trigger.Once)
.format("parquet")
.option("checkpointLocation", checkpointPath)
.start(dataPath)
But I am getting following exception: org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame;
I am very new to spark streaming, please let me know how can i create a streaming dataframe/convert the above regular dataframe into streaming dataframe for my test suite.
Spark Streaming is a processing engine to process data in real-time from sources and output data to external storage systems. Spark Streaming has 3 major components: input sources, streaming engine, and sink. Input sources generate data like Kafka, Flume, HDFS/S3, etc.
Streaming – Complete Output Mode This mode is used only when you have streaming aggregated data. One example would be counting the words on streaming data and aggregating with previous data and output the results to sink.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
The output can be defined in a different mode: Complete Mode - The entire updated Result Table will be written to the external storage.
In Spark Structured Streaming dataframes/datasets are created out stream using readStream on SparkSession. If the dataframe/dataset are not created using stream then you are not allowed store using writeStream.
So create the dataframes/datasets using readStream and store the dataframes/datasets using writeStream
val kafkaStream = sparkSession.readStream.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker-hostname:port")
.option("subscribe", "topicname")
.load()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With