I have written a Kafka Producer in NodeJS and Kafka Consumer in Java Maven. My topic is "test" which was created by the following command:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Producer in NodeJS:
var kafka = require('kafka-node');
var Producer = kafka.Producer;
var Client = kafka.Client;
var client = new Client('localhost:2181');
var producer = new Producer(client);
producer.on('ready', function () {
producer.send([
{ topic: 'test', partition: 0, messages: ["This is the zero message I am sending from Kafka to Spark"], attributes: 0},
{ topic: 'test', partition: 1, messages: ["This is the first message I am sending from Kafka to Spark"], attributes: 0},
{ topic: 'test', partition: 2, messages: ["This is the second message I am sending from Kafka to Spark"], attributes: 0}
], function (err, result) {
console.log(err || result);
process.exit();
});
});
When I send two messages from NodeJS producer, it successfully consumed by Java Consumer. But when I send three or more messages from NodeJS producer, it gives me following error:
{ [BrokerNotAvailableError: Could not find the leader] message: 'Could not find the leader' }
I want to ask that how can I set LEADER to any message in a topic "test". Or what should be the solution for the issue.
Instead of partition
use partitions
(plural key name).
for example:
producer.on('ready', function () {
producer.send([
{ topic: 'test', partitions: 0, messages: ["This is the zero message I am sending from Kafka to Spark"], attributes: 0},
{ topic: 'test', partitions: 1, messages: ["This is the first message I am sending from Kafka to Spark"], attributes: 0},
{ topic: 'test', partitions: 2, messages: ["This is the second message I am sending from Kafka to Spark"], attributes: 0}
], function (err, result) {
console.log(err || result);
process.exit();
});
});
The topic was created with 1 partition, however at the producer end you are trying to send messages to 3 partitions, logically Kafka is not supposed to find leader for the other partitions and should throw this exception.
There is a bug that can cause this in the current version of kafka-node
https://github.com/SOHU-Co/kafka-node/issues/354
HighLevelProducer with KeyedPartitioner fails on first send #354 When using KeyedParitioner with the HighLevelProducer the first send fails with BrokerNotAvailableError: Could not find the leader Consecutive sends work perfectly.
Also see https://www.npmjs.com/package/kafka-node#highlevelproducer-with-keyedpartitioner-errors-on-first-send
Which recommends
Call client.refreshMetadata() before sending the first message.
This is how I've done that
// Refresh metadata required for the first message to go through
// https://github.com/SOHU-Co/kafka-node/pull/378
client.refreshMetadata([topic], (err) => {
if (err) {
console.warn('Error refreshing kafka metadata', err);
}
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With