Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to specify batch interval in Spark Structured Streaming?

I am going through Spark Structured Streaming and encountered a problem.

In StreamingContext, DStreams, we can define a batch interval as follows :

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 5) # 5 second batch interval

How to do this in Structured Streaming?

My streaming is something like :

sparkStreaming = SparkSession \
.builder \
.appName("StreamExample1") \
.getOrCreate()

stream_df = sparkStreaming.readStream.schema("col0 STRING, col1 INTEGER").option("maxFilesPerTrigger", 1).\
csv("C:/sparkStream")

sql1 = stream_df.groupBy("col0").sum("col1")
query = sql1.writeStream.queryName("stream1").outputMode("complete").format("memory").start() 

This code is working as expected but, how to/where to define the batch interval here?

I am new to Structured Streaming, please guide me.

like image 780
Obsidian Avatar asked Sep 02 '19 17:09

Obsidian


1 Answers

tl;dr Use trigger(...) (on the DataStreamWriter, i.e. after writeStream)


This is an excellent source https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.

There are various options, if you do not set a batch interval, Spark will look for data as soon as it has processed last batch. Trigger is the go here.

From the manual:

The trigger settings of a streaming query defines the timing of streaming data processing, whether the query is going to executed as micro-batch query with a fixed batch interval or as a continuous processing query.

Some examples:

Default trigger (runs micro-batch as soon as it can)

df.writeStream \
  .format("console") \
  .start()

ProcessingTime trigger with two-seconds micro-batch interval

df.writeStream \
  .format("console") \
  .trigger(processingTime='2 seconds') \
  .start()

One-time trigger

df.writeStream \
  .format("console") \
  .trigger(once=True) \
  .start()

Continuous trigger with one-second checkpointing interval

df.writeStream
  .format("console")
  .trigger(continuous='1 second')
  .start()
like image 95
thebluephantom Avatar answered Sep 21 '22 15:09

thebluephantom