I am using Kafka 1.0.1 in my application and I have started using the Idempotent Producer feature that was introduced in 0.11, and I've having trouble understanding the ordering guarantees when using the Idempontent feature.
My producer's configuration is:
enable.idempotence = true
max.in.flight.requests.per.connection = 5
retries = 50
acks = all
According to the documentation:
retries
Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.
enable.idempotence
When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0 and acks must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown.
My configuration seems to be according to the requirements, but they don't seem to align.
Another question I have has to do with the OutOfOrderSequenceException:
According to the documentation, if I get this exception it means that the producer is in risk of becoming out of order. But if my producer is configured with max.in.flight.requests.per.connection = 5
and let's say that the second request got the out of order exception, what happens to all of the following requests that are already in flight? will this mean that I am for sure out of order?
The ordering is guaranteed when enabling idempotence in the KafkaProducer.
Even if you have max.in.flight.requests.per.connection
larger or equal to 1 an idempotent KafkaProducer will still ensure the ordering within a TopicPartition. The description on the "retries" with the relation to the max.in.flight
is only applicable if idempotence is disabled.
An idempotent KafkaProducer uses an internal incrementing sequence number to ensure the ordering up to 5 in.flight requests as described in the Pull-Request #3743:
"[...] we retain the record metadata for 5 older batches."
Also, the documentation on the enable.idempotence informs about having at maximum 5 in-flight request. Otherwise you would get a ConfigException
.
More details are given in KAFKA-5494 and the corresponding documentation on Design for max.in.flight > 1 with idempotence enabled. where the steps 5 and 6 will clarify your question:
With the above assumptions in place, the solution is as follows:
We keep track of the last acknowledged sequence sent to a partition. This is updated on every successful ack and thus should always be increasing.
We keep track of the next sequence for a batch bound for a given partition.
We assigned the nextSequence when the batch is drained. We also increment the nextSequence by the record count of a batch.
If a produce request succeeds, we set the last ack’d sequence to be the last sequence of the batch.
If a produce request fails, succeeding in flight batches will also fail with an OutOfOrderSequenceException.
As such, if the sequence number of a batch is not the successor of the last ack’d sequence, and if it fails with an OutOfOrderSequenceException, we consider this to be retriable.
When a batch is requeued, we erase the producer id and sequence number before inserting it in the queue.
When the first inflight batch fails (for whatever reason), we reset the nextSequence to be the lastAckdSequence + 1.
Thus if a batch fails fatally, the sequence numbers of succeeding batches will be different on the retry. This is fine, because the previous failure was an OutOfSequence
exception, which categorically means that the request was rejected.
"[...] if my producer is configured with max.in.flight.requests.per.connection = 5 and let's say that the second request got the out of order exception, what happens to all of the following requests that are already in flight? will this mean that I am for sure out of order?"
As mentioned in step 5 and 6 all succeeding in-flight requests will also fail with an retriable OutOfOrderSequenceExcpetion
. As retries
are greater than 0
the idempotent KafkaProducer will be able to keep the order.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With