Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Simple Kafka Consumer not receiving messages

I am a newbie to Kafka and running a simple kafka consumer/producer example as given on KafkaConsumer and KafkaProducer. When I am running consumer from terminal, consumer is receiving messages but I am not able to listen using Java code. I have searched for similar issues on StackoverFlow also (Links: Link1, Link2) and tried that solutions but nothing seems to be working for me. Kafka Version: kafka_2.10-0.10.2.1 and corresponding maven dependency is used in pom.

Java Code for producer and consumer:

public class SimpleProducer {
public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9094");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for(int i = 0; i < 10; i++)
        producer.send(new ProducerRecord<String, String>("topic3", Integer.toString(i), Integer.toString(i)));

    producer.close();

}}

public class SimpleConsumer {

public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9094");
    props.put("group.id", "test");
    props.put("zookeeper.connect", "localhost:2181");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("topic3", "topic2"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}}

Starting kafka: bin/kafka-server-start.sh config/server.properties (I have already set port, brokerid in properties file)

like image 350
mohiitg Avatar asked Jun 26 '17 13:06

mohiitg


People also ask

How do I receive messages in Kafka topic?

You can use a KafkaConsumer node in a message flow to subscribe to a specified topic on a Kafka server. The KafkaConsumer node then receives messages that are published on the Kafka topic, as input to the message flow.

How do you ensure consumers receive messages in the correct order with Kafka?

How to Ensure the Order of Messages. In Kafka, order can only be guaranteed within a partition. This means that if messages were sent from the producer in a specific order, the broker will write them to a partition and all consumers will read from that in the same order.

What if consumer fails in Kafka?

If the consumer fails after saving the offsets back to Kafka but before writing the data to the database, it will skip these records next time it runs and data will be lost.

How do consumers consumes messages in Kafka?

Kafka assigns the partitions of a topic to the consumers in a group. Specify Read latest as the option for consuming messages. The latest messages are read starting at the time at which the integration is activated.


1 Answers

First check what all the groups are available by using :

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

Then check for which group your topic belongs by using below cmd :

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <your group name> --describe

Once you find your topic and associated group name (just replace group.id with your group if it not belongs to default group) then try with below prop and let me know if it works :

  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "test-consumer-group"); // default topic name
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

  //Kafka Consumer subscribes list of topics here.
  consumer.subscribe(Arrays.asList(topicName));  // replace you topic name

  //print the topic name

  java.util.Map<String,java.util.List<PartitionInfo>> listTopics = consumer.listTopics();
  System.out.println("list of topic size :" + listTopics.size());

  for(String topic : listTopics.keySet()){
      System.out.println("topic name :"+topic);
  }
like image 189
Sanjay Avatar answered Sep 28 '22 10:09

Sanjay