Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

BrokerNotAvailableError: Could not find the leader Exception while Spark Streaming

Tags:

apache-kafka

I have written a Kafka Producer in NodeJS and Kafka Consumer in Java Maven. My topic is "test" which was created by the following command:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Producer in NodeJS:

var kafka = require('kafka-node');
var Producer = kafka.Producer;
var Client = kafka.Client;
var client = new Client('localhost:2181');
var producer = new Producer(client);

producer.on('ready', function () {
    producer.send([
        { topic: 'test', partition: 0, messages: ["This is the zero message I am sending from Kafka to Spark"], attributes: 0},
        { topic: 'test', partition: 1, messages: ["This is the first message I am sending from Kafka to Spark"], attributes: 0},
        { topic: 'test', partition: 2, messages: ["This is the second message I am sending from Kafka to Spark"], attributes: 0}
        ], function (err, result) {
        console.log(err || result);
        process.exit();
    });
});

When I send two messages from NodeJS producer, it successfully consumed by Java Consumer. But when I send three or more messages from NodeJS producer, it gives me following error:

{ [BrokerNotAvailableError: Could not find the leader] message: 'Could not find the leader' }

I want to ask that how can I set LEADER to any message in a topic "test". Or what should be the solution for the issue.

like image 910
Hafsa Asif Avatar asked Jul 23 '15 13:07

Hafsa Asif


3 Answers

Instead of partition use partitions (plural key name).

for example:

producer.on('ready', function () {
  producer.send([
    { topic: 'test', partitions: 0, messages: ["This is the zero message I am sending from Kafka to Spark"], attributes: 0},
    { topic: 'test', partitions: 1, messages: ["This is the first message I am sending from Kafka to Spark"], attributes: 0},
    { topic: 'test', partitions: 2, messages: ["This is the second message I am sending from Kafka to Spark"], attributes: 0}
  ], function (err, result) {
    console.log(err || result);
    process.exit();
  });
});
like image 123
Miguel Veces Avatar answered Oct 23 '22 06:10

Miguel Veces


The topic was created with 1 partition, however at the producer end you are trying to send messages to 3 partitions, logically Kafka is not supposed to find leader for the other partitions and should throw this exception.

like image 2
user2720864 Avatar answered Oct 23 '22 05:10

user2720864


There is a bug that can cause this in the current version of kafka-node

https://github.com/SOHU-Co/kafka-node/issues/354

HighLevelProducer with KeyedPartitioner fails on first send #354 When using KeyedParitioner with the HighLevelProducer the first send fails with BrokerNotAvailableError: Could not find the leader Consecutive sends work perfectly.

Also see https://www.npmjs.com/package/kafka-node#highlevelproducer-with-keyedpartitioner-errors-on-first-send

Which recommends

Call client.refreshMetadata() before sending the first message.

This is how I've done that

    // Refresh metadata required for the first message to go through
    // https://github.com/SOHU-Co/kafka-node/pull/378
    client.refreshMetadata([topic], (err) => {
        if (err) {
            console.warn('Error refreshing kafka metadata', err);
        }
    });
like image 2
ubershmekel Avatar answered Oct 23 '22 04:10

ubershmekel