This is for Spark 3.5.0, haven't tried other versions.
I wrote a simple Spark streaming app, using the rate-micro-batch format, which is used for generating test data.
According to this guide it has an option startTimestamp, which is the starting value of generated time. But changing this option doesn't seem to do anything, I tried setting it to different values, and the starting time is always around 1970-01-01.
Am I not understanding something, or is this a bug?
package org.example;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import static org.apache.spark.sql.functions.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class StreamingSparkPartitioned {
public static void main(String[] args) throws TimeoutException, StreamingQueryException {
SparkSession spark = SparkSession.builder()
.master("local[*]")
.getOrCreate();
Column expression = when(expr("value % 3 = 1"), "stupid_event").otherwise(
when(expr("value % 3 = 2"), "smart_event")
.otherwise("neutral_event"));
DataStreamWriter streamingDF = spark.readStream()
.format("rate-micro-batch")
.option("rowsPerBatch", "100")
.option("startTimestamp", "10000")
.load()
.withColumn("event_type", expression)
.writeStream()
.option("checkpointLocation", "C:\\Users\\wnwnn\\Desktop\\checkpoint");
streamingDF.outputMode(OutputMode.Append())
.format("console")
.trigger(Trigger.ProcessingTime(1, TimeUnit.SECONDS))
.start()
.awaitTermination();
}
}
I think startTimestamp is in millis, so setting it to 10000 millis gives you 10 seconds after the 1970 epoch, which is going to come out as 1970-01-01.
If you set it to a much higher value like 1234567890123, it would give you a date in 2009:
DataStreamWriter streamingDF = spark.readStream()
.format("rate-micro-batch")
.option("rowsPerBatch", "100")
.option("startTimestamp", "1234567890123")
.load()
.withColumn("event_type", expression)
.writeStream()
.option("checkpointLocation", "C:\\Users\\wnwnn\\Desktop\\checkpoint");
I've not tested that to confirm yet but will do when I get a moment.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With