I have a weird issue with trying to read data from Kafka using Spark structured streaming. My use case is to be able to read from a topic from the largest/latest offset available.
My read configs:
val data = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "some xyz server")
.option("subscribe", "sampletopic")
.option("auto.offset.reset", "latest")
.option("startingOffsets", "latest")
.option("kafkaConsumer.pollTimeoutMs", 20000)
.option("failOnDataLoss","false")
.option("maxOffsetsPerTrigger",20000)
.load()
My write configs:
data
.writeStream
.outputMode("append")
.queryName("test")
.format("parquet")
.option("checkpointLocation", "s3://somecheckpointdir")
.start("s3://outpath").awaitTermination()
Versions used:
spark-core_2.11 : 2.2.1
spark-sql_2.11 : 2.2.1
spark-sql-kafka-0-10_2.11 : 2.2.1
I have done my research online and from [the Kafka documentation](https://kafka.apache.org/0100/documentation.html0/
I am using the new consumer apis and as the documentation suggests i just need to set auto.offset.reset to "latest" or startingOffsets to "latest" to ensure that my Spark job starts consuming from the the latest offset available per partition in Kafka.
I am also aware that the setting auto.offset.reset
only kicks in when a new query is started for the first time and not on a restart of an application in which case it will continue to read from the last saved offset.
I am using s3 for checkpointing my offsets. and I see them being generated under s3://somecheckpointdir.
The issue I am facing is that the Spark job always read from earliest offset even though latest option is specified in the code during startup of application when it is started for the first time and I see this in the Spark logs.
auto.offset.reset = earliest
being used. I have not seen posts related to this particular issue.
I would like to know if I am missing something here and if someone has seen this behavior before. Any help/direction will indeed be useful. Thank you.
kafka.
prefix. Hence the correct option key is kafka.auto.offset.reset
.auto.offset.reset
. Instead, "set the source option startingOffsets
to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets
only applies when a new streaming query is started, and that resuming will always pick up from where the query left off." [1][1] http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With