Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka-node start consume from last offset

I'm using kafka-node to consume messages from a specific Kafka topic. When I restart my node server, it init my consumer as expected, but it's default behavior is to start consume from offset 0 while my goal is to receive only new messages (aka start consume from current offset). I didn't find a way to achieve that from the API documentation. Anyone knows if its supported?

Thanks!

like image 201
ItayB Avatar asked Jun 21 '16 16:06

ItayB


2 Answers

I asked this question in kafka-node github issues (link) and got an answer. It is now available (from v0.4.0). The following snippet worked for me:

consumerClient = new kafka.Client('localhost:2181');

/* Print latest offset. */
var offset = new kafka.Offset(consumerClient);

offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) {
        var latestOffset = data['myTopic']['0'][0];
        console.log("Consumer current offset: " + latestOffset);
});

var consumer = new kafka.HighLevelConsumer(
        consumerClient,
        [
            { topic: 'myTopic', partition: 0, fromOffset: -1 }
        ],
        {
            autoCommit: false
        }
);

Cheers!

like image 111
ItayB Avatar answered Nov 08 '22 00:11

ItayB


Similar answer; this will retrieve all offsets for each partition and will set the offset to the highest value, minus 1, to consume the last published message for given topic.

var offset = new kafka.Offset(client)
offset.fetchLatestOffsets([topic], (err, offsets) => {
    if (err) {
        console.log(`error fetching latest offsets ${err}`)
        return
    }
    var latest = 1
    Object.keys(offsets[topic]).forEach( o => {
        latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
    })
    consumer.setOffset(topic, 0, latest-1)
})
like image 25
Vincent van Dam Avatar answered Nov 08 '22 00:11

Vincent van Dam