Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

UnknownProducerIdException in Kafka streams when enabling exactly once

After enabling exactly once processing on a Kafka streams application, the following error appears in the logs:

ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
due to the following error:

org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort 
sending since an error caught with a previous record (key 222222 value 
some-value timestamp 1519200902670) to topic exactly-once-test-topic- 
v2 due to This exception is raised by the broker if it could not 
locate the producer metadata associated with the producerId in 
question. This could happen if, for instance, the producer's records 
were deleted because their retention time had elapsed. Once the last 
records of the producerId are removed, the producer's metadata is 
removed from the broker, and future appends by the producer will 
return this exception.
  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
  at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
  at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
  at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
  at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
  at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
  at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
  at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
  at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
  at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
  at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
  at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException

We've reproduced the issue with a minimal test case where we move messages from a source stream to another stream without any transformation. The source stream contains millions of messages produced over several months. The KafkaStreams object is created with the following StreamsConfig:

  • StreamsConfig.PROCESSING_GUARANTEE_CONFIG = "exactly_once"
  • StreamsConfig.APPLICATION_ID_CONFIG = "Some app id"
  • StreamsConfig.NUM_STREAM_THREADS_CONFIG = 1
  • ProducerConfig.BATCH_SIZE_CONFIG = 102400

The app is able to process some messages before the exception occurs.

Context information:

  • we're running a 5 node Kafka 1.1.0 cluster with 5 zookeeper nodes.
  • there are multiple instances of the app running

Has anyone seen this problem before or can give us any hints about what might be causing this behaviour?

Update

We created a new 1.1.0 cluster from scratch and started to process new messages without problems. However, when we imported old messages from the old cluster, we hit the same UnknownProducerIdException after a while.

Next we tried to set the cleanup.policy on the sink topic to compact while keeping the retention.ms at 3 years. Now the error did not occur. However, messages seem to have been lost. The source offset is 106 million and the sink offset is 100 million.

like image 669
Odinodin Avatar asked Apr 17 '18 08:04

Odinodin


People also ask

How Kafka streams guarantees exactly once processing?

guarantee is configured to exactly_once, Kafka Streams sets the internal embedded producer client with a transaction id to enable the idempotence and transactional messaging features, and also sets its consumer client with the read-committed mode to only fetch messages from committed transactions from the upstream ...

When should you not use Kafka streams?

As point 1 if having just a producer producing message we don't need Kafka Stream. If consumer messages from one Kafka cluster but publish to different Kafka cluster topics. In that case, you can even use Kafka Stream but have to use a separate Producer to publish messages to different clusters.

Is Kafka at least once?

At least once guarantee means you will definitely receive and process every message, but you may process some messages additional times in the face of a failure.

What is grace period in Kafka streams?

« Kafka Summit Americas 2021. The grace period is a parameter of windowed operations such as Window or Session aggregates, or stream-stream joins. This configuration determines how long after a window ends any new data will still be processed.


1 Answers

As explained in the comments, there currently seems to be a bug that may cause problems when replaying messages older than the (maximum configurable?) retention time.

At time of writing this is unresolved, the latest status can always be seen here:

https://issues.apache.org/jira/browse/KAFKA-6817

like image 69
Dennis Jaheruddin Avatar answered Sep 23 '22 20:09

Dennis Jaheruddin