I've setup a Flink 1.2 standalone cluster with 2 JobManagers and 3 TaskManagers and I'm using JMeter to load-test it by producing Kafka messages / events which are then processed. The processing job runs on a TaskManager and it usually takes ~15K events/s.
The job has set EXACTLY_ONCE checkpointing and is persisting state and checkpoints to Amazon S3.
If I shutdown the TaskManager running the job it takes a bit, a few seconds, then the job is resumed on a different TaskManager. The job mainly logs the event ids which are consecutive integers (e.g. from 0 to 1200000).
When I check the output on the TaskManager I shut down the last count is for example 500000, then when I check the output on the resumed job on a different TaskManager it starts with ~ 400000. This means ~100K of duplicated events. This number is dependent on the speed of the test can be higher or lower.
Not sure if I'm missing something but I would expect the job to display the next consecutive number (like 500001) after resuming on the different TaskManager.
Does anyone know why this is happening / extra settings I have to configure to obtain the exactly once?
Apache Flink is used for performing stateful computations on streaming data because of its low latency, reliability and exactly-once characteristics.
For the producer side, Flink use two-phase commit [1] to achieve exactly-once. Roughly Flink Producer would relies on Kafka's transaction to write data, and only commit data formally after the transaction is committed. Users could use Semantics. EXACTLY_ONCE to enable this functionality.
Flink uses a KafkaConsumer with “read_committed” mode enabled, where it will only read transactional messages. This feature was enabled at Uber as a direct result of the work discussed in this blog. Secondly, we generate unique identifiers for every record produced by the Aggregation job which will be detailed below.
Apache Flink is a distributed system and requires compute resources in order to execute applications. Flink integrates with all common cluster resource managers such as Hadoop YARN and Kubernetes, but can also be setup to run as a stand-alone cluster.
You are seeing the expected behavior for exactly-once. Flink implements fault-tolerance via a combination of checkpointing and replay in the case of failures. The guarantee is not that each event will be sent into the pipeline exactly once, but rather that each event will affect your pipeline's state exactly once.
Checkpointing creates a consistent snapshot across the entire cluster. During recovery, operator state is restored and the sources are replayed from the most recent checkpoint.
For a more thorough explanation, see this data Artisans blog post: High-throughput, low-latency, and exactly-once stream processing with Apache Flink™, or the Flink docs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With