Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Spark structured streaming exactly once - Not achieved - Duplicated events

I am using Spark Structured Streaming to consume events from Kafka and upload them to S3.

Checkpoints are committed on S3:

DataFrameWriter<Row> writer = input.writeStream()
           .option("truncate", false)           
           .option("checkpointLocation", "s3://bucket1")
           .option("compression", "zlib")
           .option("path", "s3://bucket2");

The offsets are committed to Kafka via StreamingQueryListener :


Once the application is started it retrieves the offset map from Kafka and start the stream:

 reader = sparkSession
            .option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
            .option("subscribe", "topic1")
            .option("max.poll.records", 1000)
            .option("failOnDataLoss", false)
            .option("startingOffsets", topicPartitionMap)

I store the topic/partition/offset with the data in the ORC files.

The data contain multiple duplicates of the events with exact topic/partition/offset.

How the stream should be configured to achieve exactly once processing ?

like image 748
Alex Stanovsky Avatar asked Mar 21 '19 14:03

Alex Stanovsky

1 Answers

Found out that those parameters should be set to true spark.streaming.driver.writeAheadLog.closeFileAfterWrite and spark.streaming.receiver.writeAheadLog.closeFileAfterWrite

Set this to 'true' when you want to use S3 for the metadata WAL


More details here: https://www.waitingforcode.com/apache-spark-streaming/spark-streaming-configuration/read?fbclid=IwAR17x1AfTLH1pjq1QPkDsQT6DU4hgi7WNeIYUnw25Hvquoj-4yQU10R0GeM

like image 181
Alex Stanovsky Avatar answered Sep 20 '22 06:09

Alex Stanovsky