Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get last offset Number directly or to filter the offsets in NodeJS Kafka Consumer

Tags:

node.js

I am using a simple consumer/producer in Kafka NodeJS. My producer is sending messages which I am easily getting in consumer. Producer and Consumer code is below. In Consumer, I ws expecting that offset.fetch() gives me all offset IDs of today, but it doesnot. KIndly guide me that how I can get results from this method and also mention a method that directly gives the last offset number in the topic of any partition. I also want to know that how can I filter offsets in the coming streaming. e.g: If I want to get only last 20 messages in my consumer?

My producer is:

var kafka = require('kafka-node');
var Producer = kafka.Producer;
var Client = kafka.Client;
var client = new Client('localhost:2181');
var producer = new Producer(client);
producer.on('ready', function () {
    producer.send([
        { topic: 'test', key:'key1', partition: 0, messages: ['banana','carrot','lemon','apple','melon','kiwi','mango','avacado'], attributes: 0}
        ], function (err, result) {
        console.log(err || result);
        process.exit();
    });
});

My Consumer is:

var kafka = require('kafka-node');
var Consumer = kafka.Consumer;
var client = new kafka.Client('localhost:2181');
var offset = new kafka.Offset(client);
offset.fetch([
    { topic: 'test'  }
], function (err, data) {
    console.log(data);

});
var consumer = new Consumer(
        client,
        [
            { topic: 'test', partition: 0}
        ],

        {  autoCommit: false, autoCommitIntervalMs: 5000,  fetchMaxWaitMs: 100, fromOffset: true, fetchMinBytes: 1, fetchMaxBytes: 1024 * 10
        }
    );
consumer.on('message', function (message) {
       console.log(message);
});
like image 555
Hafsa Asif Avatar asked Nov 27 '25 10:11

Hafsa Asif


2 Answers

I was also in a situation where I need to get massage from the current time.I did it by fetching offset value of latest message, for which I passed current time and number of offset values to offset.fetch([ ]) . Then by getting the offset value I just passed it to Consumer API and retrieved message from that time.

 offset.fetch([
    { topic: topicname, partition: partition, time: Date.now(), maxNum: 1 }
 ], function (err, data) {
    // data 
    // { 'topicname': { 'partition': [999] } } 
       var consumer = new Consumer(
            client,
                  [
                   { topic:topicname,
                     partition:partition,
                     offset:data[topicname][partition][0]}
                    ],
                    {
                        autoCommit: false,
                        fromOffset: true,
                    }
           );
           consumer.on('message', function (message) {
                  console.log(message);
           });
      });

this will give you messages from the current time. you can calculate time 20 min before and can get offset value of the message occurred 20 min before.

like image 52
anand Avatar answered Nov 30 '25 05:11

anand


As a follow up to @anand's answer. In the offset payloads, you can change the time field to -1 or -2, instead of a date for the following results.

// When time is -2, your response will be {topic: {partition: [lowestOffset]}
{ topic: topicname, partition: partition, time: -1, maxNum: 1 }

// When time is -1, your response will be {topic: {partition: [highestOffset, lowestOffset]}
{ topic: topicname, partition: partition, time: -2, maxNum: 1 }

The '-1' was especially helpful for me as I can just set my offset equal to the highest known offset for testing.

like image 35
CBP Avatar answered Nov 30 '25 04:11

CBP



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!