Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark structured streaming query always starts with auto.offset.rest=earliest even though auto.offset.reset=latest is set

I have a weird issue with trying to read data from Kafka using Spark structured streaming. My use case is to be able to read from a topic from the largest/latest offset available.

My read configs:

val data = spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "some xyz server")
     .option("subscribe", "sampletopic")
     .option("auto.offset.reset", "latest")
     .option("startingOffsets", "latest")
     .option("kafkaConsumer.pollTimeoutMs", 20000)
     .option("failOnDataLoss","false")
     .option("maxOffsetsPerTrigger",20000)
     .load()

My write configs:

data
    .writeStream
    .outputMode("append")
    .queryName("test") 
    .format("parquet")
    .option("checkpointLocation", "s3://somecheckpointdir")
    .start("s3://outpath").awaitTermination()

Versions used:

  • spark-core_2.11 : 2.2.1
  • spark-sql_2.11 : 2.2.1
  • spark-sql-kafka-0-10_2.11 : 2.2.1

I have done my research online and from [the Kafka documentation](https://kafka.apache.org/0100/documentation.html0/

I am using the new consumer apis and as the documentation suggests i just need to set auto.offset.reset to "latest" or startingOffsets to "latest" to ensure that my Spark job starts consuming from the the latest offset available per partition in Kafka.

I am also aware that the setting auto.offset.reset only kicks in when a new query is started for the first time and not on a restart of an application in which case it will continue to read from the last saved offset.

I am using s3 for checkpointing my offsets. and I see them being generated under s3://somecheckpointdir.

The issue I am facing is that the Spark job always read from earliest offset even though latest option is specified in the code during startup of application when it is started for the first time and I see this in the Spark logs. auto.offset.reset = earliest being used. I have not seen posts related to this particular issue.

I would like to know if I am missing something here and if someone has seen this behavior before. Any help/direction will indeed be useful. Thank you.

like image 893
Karthik Reddy Avatar asked Mar 07 '23 21:03

Karthik Reddy


1 Answers

  1. All Kafka configurations should be set with kafka. prefix. Hence the correct option key is kafka.auto.offset.reset.
  2. You should never set auto.offset.reset. Instead, "set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off." [1]

[1] http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations

like image 174
zsxwing Avatar answered Mar 10 '23 11:03

zsxwing