I use Spark 2.2.0-rc1.
I've got a Kafka topic
which I'm querying a running watermarked aggregation, with a 1 minute
watermark, giving out to console
with append
output mode.
import org.apache.spark.sql.types._
val schema = StructType(StructField("time", TimestampType) :: Nil)
val q = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("startingOffsets", "earliest").
option("subscribe", "topic").
load.
select(from_json(col("value").cast("string"), schema).as("value"))
select("value.*").
withWatermark("time", "1 minute").
groupBy("time").
count.
writeStream.
outputMode("append").
format("console").
start
I am pushing following data in Kafka topic
:
{"time":"2017-06-07 10:01:00.000"}
{"time":"2017-06-07 10:02:00.000"}
{"time":"2017-06-07 10:03:00.000"}
{"time":"2017-06-07 10:04:00.000"}
{"time":"2017-06-07 10:05:00.000"}
And I am getting following output:
scala> -------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
Is this expected behaviour?
Pushing more data to Kafka should trigger Spark to output something. The current behavior is totally because of the internal implementation.
When you push some data, StreamingQuery will generate a batch to run. When this batch finishes, it will remember the max event time in this batch. Then in the next batch,
because you are using append
mode, StreamingQuery will use the max event time and watermark to evict old values from StateStore and output it. Therefore you need to make sure generating at least two batches in order to see output.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With