Trying to load around 50K messages into KAFKA topic. In the beginning of few runs getting below exception but not all the time.
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) ~[kafka-clients-2.0.0.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:679) ~[kafka-clients-2.0.0.jar:?]
at myPackage.persistUpdatesPostAction(MyCode.java:??) ~[aKafka.jar:?]
...
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
attempted an operation with an old epoch. Either there is a newer producer with
the same transactionalId, or the producer's transaction has been expired by the
broker.
Code Block is below:
public void persistUpdatesPostAction(List<Message> messageList ) {
if ((messageList == null) || (messageList.isEmpty())) {
return;
}
logger.createDebug("Messages in batch(postAction) : "+ messageList.size());
Producer<String,String> producer = KafkaUtils.getProducer(Thread.currentThread().getName());
try {
producer.beginTransaction();
createKafkaBulkInsert1(producer, messageList, "Topic1");
createKafkaBulkInsert2(producer, messageList, "Topic2");
createKafkaBulkInsert3(producer, messageList, "Topic3");
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
producer.close();
KafkaUtils.removeProducer(Thread.currentThread().getName());
}
}
-----------
static Properties setPropertiesProducer() {
Properties temp = new Properties();
temp.put("bootstrap.servers", "localhost:9092");
temp.put("acks", "all");
temp.put("retries", 1);
temp.put("batch.size", 16384);
temp.put("linger.ms", 5);
temp.put("buffer.memory", 33554432);
temp.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
temp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return temp;
}
public static Producer<String, String> getProducer(String aThreadId) {
if ((producerMap.size() == 0) || (producerMap.get(aThreadId) == null)) {
Properties temp = producerProps;
temp.put("transactional.id", aThreadId);
Producer<String, String> producer = new KafkaProducer<String, String>(temp);
producerMap.put(aThreadId, producer);
producer.initTransactions();
return producer;
}
return producerMap.get(aThreadId);
}
public static void removeProducer(String aThreadId) {
logger.createDebug("Removing Thread ID :" + aThreadId);
if (producerMap.get(aThreadId) == null)
return;
producerMap.remove(aThreadId);
}
A batch of data is consumed by a Kafka consumer from one cluster (called “source”) then immediately produced to another cluster (called “target”) by Kafka producer. To ensure “Exactly-once” delivery, the producer creates a new transaction through a “coordinator” each time it receives a batch of data from the consumer.
Because we've enabled idempotence, Kafka will use this transaction id as part of its algorithm to deduplicate any message this producer sends, ensuring idempotency. Simply put, if the producer accidentally sends the same message to Kafka more than once, these settings enable it to notice.
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
This exception message is not very helpful. I believe that it is trying to say that the broker no longer has any record of the transaction-id that is being sent by the client. This can either be because:
UUID.randomUUID()
.In our case, we were hitting transaction timeouts every so often that generated this exception. There are 2 properties that govern how long the broker will remember a transaction before aborting it and forgetting about it.
transaction.max.timeout.ms
-- A broker property that specifies the maximum number of milliseconds until a transaction is aborted and forgotten. Default in many Kafka versions seems to be 900000 (15 minutes). Documentation from Kafka says:
The maximum allowed timeout for transactions. If a client’s requested transaction time exceeds this, then the broker will return an error in InitProducerIdRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.
transaction.timeout.ms
-- A producer client property that sets the timeout in milliseconds when a transaction is created. Default in many Kafka versions seems to be 60000 (1 minute). Documentation from Kafka says:
The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
If the transaction.timeout.ms
property set in the client exceeds the transaction.max.timeout.ms
property in the broker, the producer will immediately throw something like the following exception:
org.apache.kafka.common.KafkaException: Unexpected error in
InitProducerIdResponse The transaction timeout is larger than the maximum value
allowed by the broker (as configured by transaction.max.timeout.ms).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With