Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Kafka LEADER_NOT_AVAILABLE

I'm running into an issue with apache Kafka that I don't understand . I subscribe to a topic in my broker called "topic-received" . This is the code :

protected String readResponse(final String idMessage) {
    if (props != null) {
        kafkaClient = new KafkaConsumer<>(props);
        logger.debug("Subscribed to topic-received");
        kafkaClient.subscribe(Arrays.asList("topic-received"));
        logger.debug("Waiting for reading : topic-received");
        ConsumerRecords<String, String> records =    
                       kafkaClient.poll(kafkaConfig.getRead_timeout());

        if (records != null) {
            for (ConsumerRecord<String, String> record : records) {
                logger.debug("Resultado devuelto : "+record.value());
                return record.value();
            }
        }
    }
    return null;
}

As this is happening, I send a message to "topic-received" from another point . The code is the following one :

private void sendMessageToKafkaBroker(String idTopic, String value) {
    Producer<String, String> producer = null;
    try {
        producer = new KafkaProducer<String, String>(mapProperties());
        ProducerRecord<String, String> producerRecord = new 
                   ProducerRecord<String, String>("topic-received", value);
        producer.send(producerRecord);
        logger.info("Sended value "+value+" to topic-received");
    } catch (ExceptionInInitializerError eix) {
        eix.printStackTrace();
    } catch (KafkaException ke) {
        ke.printStackTrace();
    } finally {
        if (producer != null) {
            producer.close();
        }
    }
}

First time I try , with topic "topic-received", I get a warning like this

"WARN 13164 --- [nio-8085-exec-3] org.apache.kafka.clients.NetworkClient   :  
 Error while fetching metadata with correlation id 1 : {topic-  
 received=LEADER_NOT_AVAILABLE}"

But if I try again, to this topic "topic-received", works ok, and no warning is presented . Anyway, that's not useful for me, because I have to listen from a topic and send to a topic new each time ( referenced by an String identifier ex: .. 12Erw45-2345Saf-234DASDFasd )

Looking for LEADER_NOT_AVAILABLE in google , some guys talk about adding to server.properties the next lines :

host.name=127.0.0.1
advertised.port=9092
advertised.host.name=127.0.0.1

But it's not working for me ( Don't know why ) .

I have tried to create the topic before all this process with the following code:

 private void createTopic(String idTopic) {
    String zookeeperConnect = "localhost:2181";
    ZkClient zkClient = new ZkClient(zookeeperConnect,10000,10000, 
    ZKStringSerializer$.MODULE$);
    ZkUtils zkUtils = new ZkUtils(zkClient, new 
    ZkConnection(zookeeperConnect),false);
    if(!AdminUtils.topicExists(zkUtils,idTopic)) {
        AdminUtils.createTopic(zkUtils, idTopic, 2, 1, new Properties(), 
    null);
        logger.debug("Created topic "+idTopic+" by super user");
    }
    else{
        logger.debug("topic "+idTopic+" already exists");
    }
  }

No error, but still, it stays listening till the timeout.

I have reviewed the properties of the broker to check if there's any help, but I haven't found anything clear enough . The props that I have used for reading are :

    props = new Properties();
    props.put("bootstrap.servers", kafkaConfig.getBootstrap_servers());
    props.put("key.deserializer", kafkaConfig.getKey_deserializer());
    props.put("value.deserializer", kafkaConfig.getValue_deserializer());
    props.put("key.serializer", kafkaConfig.getKey_serializer());
    props.put("value.serializer", kafkaConfig.getValue_serializer());
    props.put("group.id",kafkaConfig.getGroupId());

and , for sending ...

   Properties props = new Properties();
    props.put("bootstrap.servers", kafkaConfig.getHost() + ":" + 
    kafkaConfig.getPort());
    props.put("group.id", kafkaConfig.getGroup_id());
    props.put("enable.auto.commit", kafkaConfig.getEnable_auto_commit());
    props.put("auto.commit.interval.ms", 
    kafkaConfig.getAuto_commit_interval_ms());
    props.put("session.timeout.ms", kafkaConfig.getSession_timeout_ms());
    props.put("key.deserializer", kafkaConfig.getKey_deserializer());
    props.put("value.deserializer", kafkaConfig.getValue_deserializer());
    props.put("key.serializer", kafkaConfig.getKey_serializer());
    props.put("value.serializer", kafkaConfig.getValue_serializer());

Any clue ? Why , the only way that I have to consume messages from the broker and from the topic, is repeating the request after an error ?

Thanks in advance

like image 394
Yago Avatar asked Sep 27 '16 07:09

Yago


1 Answers

This happens when trying to produce messages to a topic that doesn't exist

PLEASE NOTE: In some Kafka installations, the framework can automatically create the topic when it doesn't exist, that explains why you see the issue only once at the very beginning.

like image 90
Luca Tampellini Avatar answered Sep 30 '22 14:09

Luca Tampellini