Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does "startTimestamp" option work for the rate-micro-batch format?

This is for Spark 3.5.0, haven't tried other versions.

I wrote a simple Spark streaming app, using the rate-micro-batch format, which is used for generating test data.

According to this guide it has an option startTimestamp, which is the starting value of generated time. But changing this option doesn't seem to do anything, I tried setting it to different values, and the starting time is always around 1970-01-01.

Am I not understanding something, or is this a bug?

package org.example;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import static org.apache.spark.sql.functions.*;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class StreamingSparkPartitioned {
   public static void main(String[] args) throws TimeoutException, StreamingQueryException {
      SparkSession spark = SparkSession.builder()
         .master("local[*]")
         .getOrCreate();

      Column expression =  when(expr("value % 3 = 1"), "stupid_event").otherwise(
         when(expr("value % 3 = 2"), "smart_event")
            .otherwise("neutral_event"));

      DataStreamWriter streamingDF = spark.readStream()
         .format("rate-micro-batch")
         .option("rowsPerBatch", "100")
         .option("startTimestamp", "10000")
         .load()
         .withColumn("event_type", expression)
         .writeStream()
         .option("checkpointLocation", "C:\\Users\\wnwnn\\Desktop\\checkpoint");

      streamingDF.outputMode(OutputMode.Append())
         .format("console")
         .trigger(Trigger.ProcessingTime(1, TimeUnit.SECONDS))
         .start()
         .awaitTermination();
   }
}
like image 241
pavel_orekhov Avatar asked Jun 04 '26 12:06

pavel_orekhov


1 Answers

I think startTimestamp is in millis, so setting it to 10000 millis gives you 10 seconds after the 1970 epoch, which is going to come out as 1970-01-01.

If you set it to a much higher value like 1234567890123, it would give you a date in 2009:

  DataStreamWriter streamingDF = spark.readStream()
     .format("rate-micro-batch")
     .option("rowsPerBatch", "100")
     .option("startTimestamp", "1234567890123")
     .load()
     .withColumn("event_type", expression)
     .writeStream()
     .option("checkpointLocation", "C:\\Users\\wnwnn\\Desktop\\checkpoint");

I've not tested that to confirm yet but will do when I get a moment.

like image 104
Shaun Avatar answered Jun 06 '26 01:06

Shaun