I'm using kafka-node to consume messages from a specific Kafka topic. When I restart my node server, it init my consumer as expected, but it's default behavior is to start consume from offset 0 while my goal is to receive only new messages (aka start consume from current offset). I didn't find a way to achieve that from the API documentation. Anyone knows if its supported?
Thanks!
I asked this question in kafka-node github issues (link) and got an answer. It is now available (from v0.4.0). The following snippet worked for me:
consumerClient = new kafka.Client('localhost:2181');
/* Print latest offset. */
var offset = new kafka.Offset(consumerClient);
offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) {
var latestOffset = data['myTopic']['0'][0];
console.log("Consumer current offset: " + latestOffset);
});
var consumer = new kafka.HighLevelConsumer(
consumerClient,
[
{ topic: 'myTopic', partition: 0, fromOffset: -1 }
],
{
autoCommit: false
}
);
Cheers!
Similar answer; this will retrieve all offsets for each partition and will set the offset to the highest value, minus 1, to consume the last published message for given topic.
var offset = new kafka.Offset(client)
offset.fetchLatestOffsets([topic], (err, offsets) => {
if (err) {
console.log(`error fetching latest offsets ${err}`)
return
}
var latest = 1
Object.keys(offsets[topic]).forEach( o => {
latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
})
consumer.setOffset(topic, 0, latest-1)
})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With