I am using Spark Structured Streaming to consume events from Kafka and upload them to S3.
Checkpoints are committed on S3:
DataFrameWriter<Row> writer = input.writeStream()
.format("orc")
.trigger(ProcessingTime(config.getProcessingTime()))
.outputMode(OutputMode.Append())
.option("truncate", false)
.option("checkpointLocation", "s3://bucket1")
.option("compression", "zlib")
.option("path", "s3://bucket2");
The offsets are committed to Kafka via StreamingQueryListener
:
kafkaConsumer.commitSync(topicPartitionMap);
Once the application is started it retrieves the offset map from Kafka and start the stream:
reader = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
.option("subscribe", "topic1")
.option("max.poll.records", 1000)
.option("failOnDataLoss", false)
.option("startingOffsets", topicPartitionMap)
I store the topic/partition/offset
with the data in the ORC files.
The data contain multiple duplicates of the events with exact topic/partition/offset
.
How the stream should be configured to achieve exactly once processing ?
Found out that those parameters should be set to true
spark.streaming.driver.writeAheadLog.closeFileAfterWrite
and spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
Set this to 'true' when you want to use S3 for the metadata WAL
https://spark.apache.org/docs/latest/configuration.html
More details here: https://www.waitingforcode.com/apache-spark-streaming/spark-streaming-configuration/read?fbclid=IwAR17x1AfTLH1pjq1QPkDsQT6DU4hgi7WNeIYUnw25Hvquoj-4yQU10R0GeM
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With