Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Failed to rebalance error in Kafka Streams with more than one topic partition

Works fine when source topic partition count = 1. If I bump up the partitions to any value > 1, I see the below error. Applicable to both Low level as well as the DSL API. Any pointers ? What could be missing ?

org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_1] Store in-memory-avg-store's change log (cpu-streamz-in-memory-avg-store-changelog) does not contain partition 1
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:185)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
        at org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier$MemoryStore.init(InMemoryKeyValueStoreSupplier.java:102)
        at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
        at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
        at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119)
        at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
        at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
        at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
        at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
like image 867
Abhishek Avatar asked Feb 19 '17 15:02

Abhishek


1 Answers

It's an operational issue. Kafka Streams does not allow to change the number of input topic partitions during its "life time".

If you stop a running Kafka Streams application, change the number of input topic partitions, and restart your app it will break (with the error you see above). It is tricky to fix this for production use cases and it is highly recommended to not change the number of input topic partitions (cf. comment below). For POC/demos it's not difficult to fix though.

In order to fix this, you should reset your application using Kafka's application reset tool:

  • http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool
  • https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

Using the application reset tool, has the disadvantage that you wipe out your whole application state. Thus, in order to get your application into the same state as before, you need to reprocess the whole input topic from beginning. This is of course only possible, if all input data is still available and nothing got deleted by brokers that applying topic retention time/size policy.

Furthermore you should note, that adding partitions to input topics changes the topic's partitioning schema (be default hash-based partitioning by key). Because Kafka Streams assumes that input topics are correctly partitioned by key, if you use the reset tool and reprocess all data, you might get wrong result as "old" data is partitioned differently than "new" data (ie, data written after adding the new partitions). For production use cases, you would need to read all data from your original topic and write it into a new topic (with increased number of partitions) to get your data partitioned correctly (or course, this step might change the ordering of records with different keys -- what should not be an issue usually -- just wanted to mention it). Afterwards you can use the new topic as input topic for your Streams app.

This repartitioning step can also be done easily within you Streams application by using operator through("new_topic_with_more_partitions") directly after reading the original topic and before doing any actual processing.

In general however, it is recommended to over partition your topics for production use cases, such that you will never need to change the number of partitions later on. The overhead of over partitioning is rather small and saves you a lot of hassle later on. This is a general recommendation if you work with Kafka -- it's not limited to Streams use cases.

One more remark:

Some people might suggest to increase the number of partitions of Kafka Streams internal topics manually. First, this would be a hack and is not recommended for certain reasons.

  1. It might be tricky to figure out what the right number is, as it depends on various factors (as it's a Stream's internal implementation detail).
  2. You also face the problem of breaking the partitioning scheme, as described in the paragraph above. Thus, you application most likely ends up in an inconsistent state.

In order to avoid inconsistent application state, Streams does not delete any internal topics or changes the number of partitions of internal topics automatically, but fails with the error message you reported. This ensure, that the user is aware of all implications by doing the "cleanup" manually.

Btw: For upcoming Kafka 0.10.2 this error message got improved: https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L100-L103

like image 99
Matthias J. Sax Avatar answered Nov 15 '22 08:11

Matthias J. Sax