Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pick a Kafka transaction.id

I wonder could I get some help understanding transactions in Kafka and in particular how I use transaction.id. Here's the context:

  1. My Kafka application follows the pattern: consume message from input topic, process, publish to output topic.
  2. I am using not using the Kafka Streams API.
  3. I have multiple consumers in a single consumer group and each consumer is in its own polling thread.
  4. There is a thread pool with worker threads that do the message processing and publishing to the output topic. At the moment, each thread has its own producer instance.
  5. I am using the published transactions API to ensure that the update of the consume offset and the publishing to the output topic happen atomically

My assumptions to date have included:

  1. If my process crashed in mid transaction then nothing from that transaction would have published and no consume offset would have moved. So upon restart, I would simply start the transaction again from the original consume offset.
  2. For the producer transaction.id, all that mattered was that it was unique. I could therefore generate a timestamp based id at start-up

Then I read the following blog: https://www.confluent.io/blog/transactions-apache-kafka/. In particular in the section "How to pick a transaction id" it seems to imply that I need to guarantee that a producer instance per input partition. It says "The key to fencing out zombies properly is to ensure that the input topics and partitions in the read-process-write cycle is always the same for a given transactional.id.". It further cites the problem example as follows: "For instance, in a distributed stream processing application, suppose topic-partition tp0 was originally processed by transactional.id T0. If, at some point later, it could be mapped to another producer with transactional.id T1, there would be no fencing between T0 and T1. So it is possible for messages from tp0 to be reprocessed, violating the exactly once processing guarantee."

I can't quite understand why this is the case. To my mind, I shouldn't care what producer handles messages from any partition as long as transactions are atomic. I've been struggling with this for a day and I wonder if someone could tell me what I've missed here. So, why can't I assign work to any producer instance with any transaction.id setting as long as it is unique. And why do they say that messages can leak through the fencing provided by transactions if you do this.

like image 467
Radiator Avatar asked May 14 '18 16:05

Radiator


People also ask

What is Kafka transaction ID?

We solve the problem of zombie instances by requiring that each transactional producer be assigned a unique identifier called the transactional.id. This is used to identify the same producer instance across process restarts.

How transactions work in Kafka?

In consumer-side transaction, kafka consumer consumes avro messages from the topic, processes them, save processed results to the external db where the offsets are also saved to the same external db, and finally all the db transactions will be commited in the atomic way.

Does Kafka support transaction?

Indeed, Kafka supports transactions, but it is only transactions between individual topics. We can atomically receive an incoming message, process it, send some outgoing messages, and reliably acknowledge at the same time the receipt of the original message.

What is transactional producer in Kafka?

The transactional producer allows an application to send messages to multiple partitions (and topics!) atomically. To enable idempotence, the enable. idempotence configuration must be set to true. If set, the retries config will default to Integer.

What are transactions in Kafka?

Transactions enable atomic writes to multiple Kafka topics and partitions. All of the messages included in the transaction will be successfully written or none of them will be. For example, an error during processing can cause a transaction to be aborted, in which case none of the messages from the transaction will be readable by consumers.

How do I set a unique transactional in Kafka Streams?

When using the Streams API (in contrast to the regular Kafka producers) you do not have to worry about setting a unique transactional.id per instance of your stream application. When you enable Streams exactly_once semantics, the Streams API will generate the proper/ unique transactional.id based on the topic/ partition.

How do I record offset commits in Kafka?

In Kafka, we record offset commits by writing to an internal Kafka topic called the offsets topic. A message is considered consumed only when its offset is committed to the offsets topic.

How do I work with external systems in Kafka?

The recommended way to work with a transactional process that includes an external system is to write the output of the Kafka transaction to a topic and then rely on idempotence as you propagate that data to the external system.


4 Answers

The blog article you mentioned has all the information you're looking for, although it's rather dense.

From the Why Transactions? section in aforementioned article.

Using vanilla Kafka producers and consumers configured for at-least-once delivery semantics, a stream processing application could lose exactly once processing semantics in the following ways:

  1. The producer.send() could result in duplicate writes of message B due to internal retries. This is addressed by the idempotent producer and is not the focus of the rest of this post.

  2. We may reprocess the input message A, resulting in duplicate B messages being written to the output, violating the exactly once processing semantics. Reprocessing may happen if the stream processing application crashes after writing B but before marking A as consumed. Thus when it resumes, it will consume A again and write B again, causing a duplicate.

  3. Finally, in distributed environments, applications will crash or—worse!—temporarily lose connectivity to the rest of the system. Typically, new instances are automatically started to replace the ones which were deemed lost. Through this process, we may have multiple instances processing the same input topics and writing to the same output topics, causing duplicate outputs and violating the exactly once processing semantics. We call this the problem of “zombie instances.” [emphasis added]

From the Transactional Semantics section in same article.

Zombie fencing

We solve the problem of zombie instances by requiring that each transactional producer be assigned a unique identifier called the transactional.id. This is used to identify the same producer instance across process restarts. [emphasis added]

The API requires that the first operation of a transactional producer should be to explicitly register its transactional.id with the Kafka cluster. When it does so, the Kafka broker checks for open transactions with the given transactional.id and completes them. It also increments an epoch associated with the transactional.id. The epoch is an internal piece of metadata stored for every transactional.id.

Once the epoch is bumped, any producers with same transactional.id and an older epoch are considered zombies and are fenced off, ie. future transactional writes from those producers are rejected. [emphasis added]

And from the Data flow section in the same article.

A: the producer and transaction coordinator interaction

When executing transactions, the producer makes requests to the transaction coordinator at the following points:

  1. The initTransactions API registers a transactional.id with the coordinator. At this point, the coordinator closes any pending transactions with that transactional.id and bumps the epoch to fence out zombies. This happens only once per producer session. [emphasis added]

  2. When the producer is about to send data to a partition for the first time in a transaction, the partition is registered with the coordinator first.

  3. When the application calls commitTransaction or abortTransaction, a request is sent to the coordinator to begin the two phase commit protocol.

Hope this helps!

like image 96
Ryan Sobol Avatar answered Oct 24 '22 04:10

Ryan Sobol


Consider the situation where the consumer group populace is in flux (new consumers are coming online or going offline) or a failure scenario causes the rebalancing of topic-partition assignments within a consumer group.

Now assume a consumer C0 had previously been assigned partition P0. This consumer is happily chugging away and processing messages, publishing new ones, etc. (The standard consume-transform-publish pattern.) A rebalance event occurs, resulting in P0 being unceremoniously (always wanted to use that word) revoked from C0 and assigned to C1. From the perspective of C0, it might still have a backlog of messages to churn through, and it is oblivious to the reassignment. You end up in a situation where both C0 and C1 for a very brief period of time may believe they are both 'owning' P0 and will act accordingly, creating duplicate messages in the outgoing topic and, worse, having those duplicates potentially appearing out of order.

The use of transactional.id enables the 'fencing' that the original blog refers to. As part of the reassignment, the new producer will act under the incremented epoch number, while the existing one will still use the old epoch. Fencing is then trivial; drop messages where the epoch has lapsed.

There are a few gotchas with Kafka transactions:

  • The inbound and outbound topics must be on the same cluster for transactions to work.
  • The naming of transactional.id is crucial for producer 'handover', even if you don't care about zombie fencing. The emergence of the new producer will instigate the tidying up of any orphaned in-flight transactions for the lapsed producer, hence the requirement for the ID to be stable/repeatable across producer sessions. Do not use random IDs for this; not only will this lead to incomplete transactions (which blocks every consumers in READ_COMMITTED mode), but it will also accumulate additional state on the Transactional Coordinator (running on the broker). By default, this state will be persisted for 7 days, so you don't want to spawn arbitrarily named transactional producers on a whim.
  • Ideally transactional.id reflects the combination of both the inbound topic and partition. (Unless, of course, you have a single-partition topic.) In practice, this means creating a new transactional producer for every partition assigned to the consumer. (Remember, in a consume-trasform-publish scenario, a producer is also a consumer, and consumer partition assignments will vary with each rebalancing event.) Have a look at the spring-kafka implementation, which lazily creates a new producer for each inbound partition. (There is something to be said about the safety of this approach, and whether producers should be cleaned up on partition reassignment, but that's another matter.)
  • The fencing mechanism only operates at Kafka level. In other words, it isolates the lapsed producer from Kafka, but not from the rest of the world. This means that if your producer also has to update some external state (in a database, cache, etc.) as part of the consume-transform-publish cycle, it is the responsibility of the application to fence itself from the database upon partition reassignment, or otherwise ensure the idempotency of the update.

Just for completeness, it's worth pointing out that this is not the only way to achieve fencing. The Kafka consumer API does provide the user the ability to register a ConsumerRebalanceListener, which gives the displaced consumer a last-chance way of draining any outstanding backlog (or shedding it) before reassigning the partitions to the new consumer. The callback is blocking; when it returns it is assumed that the handler has fenced itself off locally; then, and only then will the new consumer resume processing.

like image 20
Emil Koutanov Avatar answered Oct 24 '22 02:10

Emil Koutanov


When using the Streams API (in contrast to the regular Kafka producers) you do not have to worry about setting a unique transactional.id per instance of your stream application. When you enable Streams exactly_once semantics, the Streams API will generate the proper/ unique transactional.id based on the topic/ partition.

Check this out what exactly is done here: https://github.com/axbaretto/kafka/blob/fe51708ade3cdf4fe9640c205c66e3dd1a110062/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L455

The Task (referring to TaskId in the code) is explained here: https://docs.confluent.io/current/streams/architecture.html#stream-partitions-and-tasks

like image 5
Vassilis Avatar answered Oct 24 '22 04:10

Vassilis


If you are using spring-kafka, things are already taken care of for you, the only thing required is a prefix:

Transactions are enabled by providing the DefaultKafkaProducerFactory with a transactionIdPrefix. In that case, instead of managing a single shared Producer, the factory maintains a cache of transactional producers. When the user calls close() on a producer, it is returned to the cache for reuse instead of actually being closed. The transactional.id property of each producer is transactionIdPrefix + n, where n starts with 0 and is incremented for each new producer, unless the transaction is started by a listener container with a record-based listener. In that case, the transactional.id is <transactionIdPrefix>.<group.id>.<topic>.<partition>. This is to properly support fencing zombies, as described here. This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and 2.2.0. If you wish to revert to the previous behavior, you can set the producerPerConsumerPartition property on the DefaultKafkaProducerFactory to false.

Cited from https://docs.spring.io/spring-kafka/reference/html/#transactions

like image 2
Nick Allen Avatar answered Oct 24 '22 02:10

Nick Allen