Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why consumer hangs while consuming messages from Kafka on DC/OS using Client API for Java?

I installed Kafka on DC/OS (Mesos) cluster on AWS. Enabled three brokers and created a topic called "topic1".

dcos kafka topic create topic1 --partitions 3 --replication 3

Then I wrote a Producer class to send messages and a Consumer class to receive them.

public class Producer {
    public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
        Map<String, Object> producerConfig = new HashMap<>();
        System.out.println("setting Producerconfig.");
        producerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");

        ByteArraySerializer serializer = new ByteArraySerializer();
        System.out.println("Creating KafkaProcuder");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
        for (int i = 0; i < 100; i++) {
            String msgstr = msg + i;
            byte[] message = msgstr.getBytes();
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
            System.out.println("Sent:" + msgstr);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        sendMessage("Kafka test message 2/27 3:32");
    }

}

public class Consumer {
    public static String getMessage() {
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
        consumerConfig.put("group.id", "dj-group");
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

        kafkaConsumer.subscribe(Arrays.asList("topic1"));
        while (true) {
            ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
            System.out.println(records.count() + " of records received.");
            for (ConsumerRecord<byte[], byte[]> record : records) {
                System.out.println(Arrays.toString(record.value()));
            }
        }
    }

    public static void main(String[] args) {
        getMessage();
    }
}

First I ran Producer on the cluster to send messages to topic1. However when I ran Consumer, it couldn't receive anything, just hang.

Producer is working since I was able to get all the messages by running the shell script that came with Kafka install

./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning

But why can't I receive with Consumer? This post suggests group.id with old offset might be a possible cause. I only create group.id in the consumer not the producer. How do I config the offset for this group?

like image 311
ddd Avatar asked Feb 27 '17 21:02

ddd


People also ask

How do I fix consumer lag in Kafka?

Consuming concurrency can increase performance. If you store offsets on the zookeeper, it can be bottleneck. Reduce commits of offset and use dedicated zookeeper if possible. The best solution is storing offsets on brokers.

How can I speed up Kafka consumer?

Increasing the number of partitions and the number of brokers in a cluster will lead to increased parallelism of message consumption, which in turn improves the throughput of a Kafka cluster; however, the time required to replicate data across replica sets will also increase.

What happens to Kafka message once it is consumed?

The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space.

Is there a limit to Kafka message size?

Kafka has a default limit of 1MB per message in the topic. This is because very large messages are considered inefficient and an anti-pattern in Apache Kafka.


1 Answers

As it turns out, kafkaConsumer.subscribe(Arrays.asList("topic1")); is causing poll() to hang. According to Kafka Consumer does not receive messages , there are two ways to connect to a topic, assign and subscribe. After I replaced subscribe with the lines below, it started working.

    TopicPartition tp = new TopicPartition("topic1", 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);

However the output shows arrays of numbers which is not expected (Producer sent Strings). But I guess this is a separate issue.

like image 156
ddd Avatar answered Nov 15 '22 21:11

ddd