Below structured streaming code watermarks and windows data over 24 hour interval in 15 minute slides. Code produces only empty Batch 0 in Append mode. In Update mode results are correctly displayed. Append mode is needed because S3 sink works only in Append mode.
String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset<Row> sliding24h = rowData
.withWatermark(eventTimeCol, slideDuration)
.groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
col(nameCol)).count();
sliding24h
.writeStream()
.format("console")
.option("truncate", false)
.option("numRows", 1000)
.outputMode(OutputMode.Append())
//.outputMode(OutputMode.Complete())
.start()
.awaitTermination();
Below is complete test code:
public static void main(String [] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
ArrayList<String> rl = new ArrayList<>();
for (int i = 0; i < 200; ++i) {
long t = 1512164314L + i * 5 * 60;
rl.add(t + ",qwer");
}
String nameCol = "name";
String eventTimeCol = "eventTime";
String eventTimestampCol = "eventTimestamp";
MemoryStream<String> input = new MemoryStream<>(42, spark.sqlContext(), Encoders.STRING());
input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
Dataset<Row> stream = input.toDF().selectExpr(
"cast(split(value,'[,]')[0] as long) as " + eventTimestampCol,
"cast(split(value,'[,]')[1] as String) as " + nameCol);
System.out.println("isStreaming: " + stream.isStreaming());
Column eventTime = functions.to_timestamp(col(eventTimestampCol));
Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);
String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset<Row> sliding24h = rowData
.withWatermark(eventTimeCol, slideDuration)
.groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
col(nameCol)).count();
sliding24h
.writeStream()
.format("console")
.option("truncate", false)
.option("numRows", 1000)
.outputMode(OutputMode.Append())
//.outputMode(OutputMode.Complete())
.start()
.awaitTermination();
}
This is bug that is been resolved in 2.4.0 See: https://issues.apache.org/jira/browse/SPARK-26167 https://issues.apache.org/jira/browse/SPARK-24156
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With