I am using kafka-node (node client for kafka), using a consumer to retrieve messages about a topic. Unfortunately, I receive an "offsetOutOfRange" condition (the offsetOutOfRange callback is invoked). My application was working fine until the consumer lagged significantly from the producer, leaving a somewhat large gap between the earliest and latest offset. At this point I (perhaps incorrectly) assumed the consumer would be able to continue receiving messages (and hopefully catchup to the producer).
My kafka consumer client code is as follows:
:
:
var kafka = require('kafka-node');
var zookeeper = "10.0.1.201:2181";
var id = "embClient";
var Consumer = kafka.Consumer;
var client = new kafka.Client(zookeeper, id);
var consumer = new Consumer( client, [ { topic: "test", partition: 0 } ], { autoCommit: false } );
consumer.on('error', [error callback...]);
consumer.on('offsetOutOfRange', [offset error callback...]);
consumer.on('message', [message callback...]);
:
:
Am I doing something incorrectly, or am I missing something?
If not, I have a few questions:
(a) is there an acknowledged "best" way to write the client to gracefully handle this condition?
(b) why would this condition be raised? (I assume a client should have been able to continue reading messages where it left off, eventually (ideally) catching up...)
(c) do I need to write code/logic to handle this condition, and explicitly reposition the consumer offset to read? (this seems a bit cumbersome)...
Any help is appreciated.
I believe it's possible that the app tried to read messages that are no longer available in Kafka. Kafka deletes the old messages based on log.retention.* properties. Let's say you've send 1000 messages to Kafka. Because of retention, Kafka has deleted the first 500 messages. If your app tries to read the message 350 it will fail and it will raise offsetOutOfRange error. It could happened because of your consumer was so slow that the Kafka broker has already deleted messages before your consumer could process it. Or your consumer crashed but the offset of the last processed message was saved somewhere.
You can use the Offset class to retrieve the latest/earliest available offset (see the method fetch
) and update the consumer's offset. We use this method.
In general it's not easy to tell what should the app do when this situation occur because clearly something is very wrong.
Hope it helps, Lukáš
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With