I am using a simple consumer/producer in Kafka NodeJS. My producer is sending messages which I am easily getting in consumer. Producer and Consumer code is below. In Consumer, I ws expecting that offset.fetch() gives me all offset IDs of today, but it doesnot. KIndly guide me that how I can get results from this method and also mention a method that directly gives the last offset number in the topic of any partition. I also want to know that how can I filter offsets in the coming streaming. e.g: If I want to get only last 20 messages in my consumer?
My producer is:
var kafka = require('kafka-node');
var Producer = kafka.Producer;
var Client = kafka.Client;
var client = new Client('localhost:2181');
var producer = new Producer(client);
producer.on('ready', function () {
producer.send([
{ topic: 'test', key:'key1', partition: 0, messages: ['banana','carrot','lemon','apple','melon','kiwi','mango','avacado'], attributes: 0}
], function (err, result) {
console.log(err || result);
process.exit();
});
});
My Consumer is:
var kafka = require('kafka-node');
var Consumer = kafka.Consumer;
var client = new kafka.Client('localhost:2181');
var offset = new kafka.Offset(client);
offset.fetch([
{ topic: 'test' }
], function (err, data) {
console.log(data);
});
var consumer = new Consumer(
client,
[
{ topic: 'test', partition: 0}
],
{ autoCommit: false, autoCommitIntervalMs: 5000, fetchMaxWaitMs: 100, fromOffset: true, fetchMinBytes: 1, fetchMaxBytes: 1024 * 10
}
);
consumer.on('message', function (message) {
console.log(message);
});
I was also in a situation where I need to get massage from the current time.I did it by fetching offset value of latest message, for which I passed current time and number of offset values to offset.fetch([ ]) . Then by getting the offset value I just passed it to Consumer API and retrieved message from that time.
offset.fetch([
{ topic: topicname, partition: partition, time: Date.now(), maxNum: 1 }
], function (err, data) {
// data
// { 'topicname': { 'partition': [999] } }
var consumer = new Consumer(
client,
[
{ topic:topicname,
partition:partition,
offset:data[topicname][partition][0]}
],
{
autoCommit: false,
fromOffset: true,
}
);
consumer.on('message', function (message) {
console.log(message);
});
});
this will give you messages from the current time. you can calculate time 20 min before and can get offset value of the message occurred 20 min before.
As a follow up to @anand's answer. In the offset payloads, you can change the time field to -1 or -2, instead of a date for the following results.
// When time is -2, your response will be {topic: {partition: [lowestOffset]}
{ topic: topicname, partition: partition, time: -1, maxNum: 1 }
// When time is -1, your response will be {topic: {partition: [highestOffset, lowestOffset]}
{ topic: topicname, partition: partition, time: -2, maxNum: 1 }
The '-1' was especially helpful for me as I can just set my offset equal to the highest known offset for testing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With