Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka: Sarama, idempotence and transactional.id

Does Shopify/sarama provide an option similar to transactional.id in JVM API?

The library supports idempotence (Config.Producer.Idemponent, similar to enable.idempotence), but I don't understand how to use it without transactional.id.

Please, correct me if I'm wrong, there is a bit lack of documentation about these options in Sarama. But according to JVM docs, idempotence without the identifier will be limited by a single producer session. In other words, we will loss the guarantee when producer fails and restart.

I found relevant properties in the source code and some tests (for example), but don't understand how to use them externally.

like image 955
Timurib Avatar asked Jun 01 '21 13:06

Timurib


1 Answers

Shopify/sarama Provides Kafka Exactly Once (Idempotency) with idempotent enabled producer. But For that below configuration setup need to be there.

From Shopify/sarama/config.go

    if c.Producer.Idempotent {
        if !c.Version.IsAtLeast(V0_11_0_0) {
            return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
        }
        if c.Producer.Retry.Max == 0 {
            return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
        }
        if c.Producer.RequiredAcks != WaitForAll {
            return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
        }
        if c.Net.MaxOpenRequests > 1 {
            return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests to be 1")
        }
    }

In Shopify/sarama How they do this is, There is a producerEpoch ID in AsyncProducer's transactionManager. You can refer the file in Shopify/sarama/async_producer.go. This Id initialise with the producer initialisation and increment when successfully producing each message. read bumpEpoch() function to see that in async_producer.go file.

This is the sequence id for that producer session with the broker and it is sending with each message. Increment when message published successfully.

Read this example. It describes how idempotence works.

You are correct on producer session fact. That exactly once promised for single producer session. When restating producer just after the sequence failure, there can be a duplicate.

When producer restarts, new PID gets assigned. So the idempotency is promised only for a single producer session. Even though producer retries requests on failures, each message is persisted in the log exactly once. There can still be duplicates depending on the source where the producer is getting data. Kafka won’t take care of the duplicate data received by the producer. So, in some cases, you may require an additional de-duplication system.

like image 83
nipuna Avatar answered Oct 05 '22 23:10

nipuna