I am going through Spark Structured Streaming and encountered a problem.
In StreamingContext, DStreams, we can define a batch interval as follows :
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 5) # 5 second batch interval
How to do this in Structured Streaming?
My streaming is something like :
sparkStreaming = SparkSession \
.builder \
.appName("StreamExample1") \
.getOrCreate()
stream_df = sparkStreaming.readStream.schema("col0 STRING, col1 INTEGER").option("maxFilesPerTrigger", 1).\
csv("C:/sparkStream")
sql1 = stream_df.groupBy("col0").sum("col1")
query = sql1.writeStream.queryName("stream1").outputMode("complete").format("memory").start()
This code is working as expected but, how to/where to define the batch interval here?
I am new to Structured Streaming, please guide me.
tl;dr Use trigger(...)
(on the DataStreamWriter
, i.e. after writeStream
)
This is an excellent source https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.
There are various options, if you do not set a batch interval, Spark will look for data as soon as it has processed last batch. Trigger is the go here.
From the manual:
The trigger settings of a streaming query defines the timing of streaming data processing, whether the query is going to executed as micro-batch query with a fixed batch interval or as a continuous processing query.
Some examples:
df.writeStream \
.format("console") \
.start()
df.writeStream \
.format("console") \
.trigger(processingTime='2 seconds') \
.start()
df.writeStream \
.format("console") \
.trigger(once=True) \
.start()
df.writeStream
.format("console")
.trigger(continuous='1 second')
.start()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With