Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka: No message seen on console consumer after message sent by Java Producer

I'm new to Kafka. I created a java producer on my local machine and setup a Kafka broker on another machine, say M2, on the network(I can ping,SSH, connect to this machine). On the Producer side in the Eclipse console I get "Message sent". But when I check the console consumer on machine M2 I cannot see those messages.

My java producer code is:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;


import java.util.HashMap;
import java.util.Map;

public class KafkaMessageProducer  {

    /**
     * @param args
     */
    public static void main(String[] args) {

        KafkaMessageProducer reportObj = new KafkaMessageProducer();
        reportObj.send();

    }

    public void send(){

        Map<String, Object> config = new HashMap<String, Object>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "135.113.133.60:9092");
        config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
        int maxMessages = 5;
        int count = 0;
        while(count < maxMessages){
            producer.send(new ProducerRecord<String, String>("test", "msg", "message --- #"+count++));
            System.out.println("Message send.."+count);
        }
        producer.close();
    }

}

Can you please let me know where I'm going wrong? I can send messages locally on machine M2 from the console producer. Note: Even when I change the IP address to the full hostname of the Kafka Broker it still has the same issue.

Update: I also think that the Producer is able to connect to the Kafka broker and send the messages, but the Kafka Broker does not pass these messages to the consumer. If I change the IP address or the port to Zookeeper(which is running on the same node as the Kafka Broker), and see Zookeeper's log, it gets the Producer ping and then rejects the session.

Update2: I created a Producer jar and ran this jar on Machine M2 and it worked. So it seems that there is something wrong with the way Producer tries to connect to the Kafka broker. Not sure yet what is the problem.

like image 444
user2441441 Avatar asked May 22 '16 04:05

user2441441


3 Answers

I finally found the answer and I'm posting here in case anyone else has the same issue. Use the Kafka broker setting advertised.hostname when you are trying to connect remotely. This worked for me.

like image 88
user2441441 Avatar answered Nov 07 '22 02:11

user2441441


Just as an idea for debugging - try producer.send(/* record */).get(); That is, wait for the result from the Future returned from the send() method. Could be that there's an exception on producer side and it's simply ignored in the background.

like image 41
t6nn Avatar answered Nov 07 '22 02:11

t6nn


You can try to use code as follows to read the metadata info for the kafka topic to see if the broker received the messages. That can help debugging.

SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), 100000,
      64 * 1024, "your_group_id");
List<String> topics = new ArrayList<>();
topics.add(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);

TopicMetadataResponse resp = simpleConsumer.send(req);
if (resp.topicsMetadata().size() != 1) {
  throw new RuntimeException("Expected one metadata for topic "
      + topic + " found " + resp.topicsMetadata().size());
}

TopicMetadata topicMetaData = resp.topicsMetadata().get(0);
like image 1
yuyang Avatar answered Nov 07 '22 01:11

yuyang