Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark watermark and windowing in Append mode

Below structured streaming code watermarks and windows data over 24 hour interval in 15 minute slides. Code produces only empty Batch 0 in Append mode. In Update mode results are correctly displayed. Append mode is needed because S3 sink works only in Append mode.

String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset<Row> sliding24h = rowData
        .withWatermark(eventTimeCol, slideDuration)
        .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
                col(nameCol)).count();

sliding24h
        .writeStream()
        .format("console")
        .option("truncate", false)
        .option("numRows", 1000)
        .outputMode(OutputMode.Append())
        //.outputMode(OutputMode.Complete())
        .start()
        .awaitTermination();

Below is complete test code:

public static void main(String [] args) throws StreamingQueryException {
     SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();

     ArrayList<String> rl = new ArrayList<>();
     for (int i = 0; i < 200; ++i) {
         long t = 1512164314L + i * 5 * 60;
         rl.add(t + ",qwer");
     }

     String nameCol = "name";
     String eventTimeCol = "eventTime";
     String eventTimestampCol = "eventTimestamp";

     MemoryStream<String> input = new MemoryStream<>(42, spark.sqlContext(), Encoders.STRING());
     input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
     Dataset<Row> stream = input.toDF().selectExpr(
             "cast(split(value,'[,]')[0] as long) as " + eventTimestampCol,
             "cast(split(value,'[,]')[1] as String) as " + nameCol);

     System.out.println("isStreaming: " +  stream.isStreaming());

     Column eventTime = functions.to_timestamp(col(eventTimestampCol));
     Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);

     String windowDuration = "24 hours";
     String slideDuration = "15 minutes";
     Dataset<Row> sliding24h = rowData
             .withWatermark(eventTimeCol, slideDuration)
             .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
                     col(nameCol)).count();

     sliding24h
             .writeStream()
             .format("console")
             .option("truncate", false)
             .option("numRows", 1000)
             .outputMode(OutputMode.Append())
             //.outputMode(OutputMode.Complete())
             .start()
             .awaitTermination();
}
like image 945
dejan Avatar asked Nov 23 '18 17:11

dejan


1 Answers

This is bug that is been resolved in 2.4.0 See: https://issues.apache.org/jira/browse/SPARK-26167 https://issues.apache.org/jira/browse/SPARK-24156

like image 169
dejan Avatar answered Oct 30 '22 05:10

dejan