Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to check if Kafka Consumer is ready

I have Kafka commit policy set to latest and missing first few messages. If I give a sleep of 20 seconds before starting to send the messages to the input topic, everything is working as desired. I am not sure if the problem is with consumer taking long time for partition rebalancing. Is there a way to know if the consumer is ready before starting to poll ?

like image 385
Nagireddy Hanisha Avatar asked Jan 03 '18 06:01

Nagireddy Hanisha


People also ask

How do I verify if Kafka consumer consumed messages?

You can use ConsumerGroupCommand to check if certain consumer group has finished processing all messages in a particular topic: Zero lag for every partition will indicate that the messages have been consumed successfully, and offsets committed by the consumer.

How do I know if a Kafka topic has messages?

You can count the number of messages in a Kafka topic simply by consuming the entire topic and counting how many messages are read. To do this from the commandline you can use the kcat tool which can act as a consumer (and producer) and is built around the Unix philosophy of pipelines.


2 Answers

  • You can use consumer.assignment(), it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.

  • If you are using spring-kafka project, you can include spring-kafka-test dependancy and use below method to wait for topic assignment , but you need to have container. ContainerTestUtils.waitForAssignment(Object container, int partitions);

like image 86
donm Avatar answered Sep 19 '22 13:09

donm


You can do the following:

I have a test that reads data from kafka topic.
So you can't use KafkaConsumer in multithread environment, but you can pass parameter "AtomicReference assignment", update it in consumer-thread, and read it in another thread.

For example, snipped of working code in project for testing:

    private void readAvro(String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      String bootstrapServers,
                      int readTimeout) {
    // print the topic name
    AtomicReference<Set<TopicPartition>> assignment = new AtomicReference<>();
    new Thread(() -> readAvro(bootstrapServers, readFromKafka, needStop, events, readTimeout, assignment)).start();

    long startTime = System.currentTimeMillis();
    long maxWaitingTime = 30_000;
    for (long time = System.currentTimeMillis(); System.currentTimeMillis() - time < maxWaitingTime;) {
        Set<TopicPartition> assignments = Optional.ofNullable(assignment.get()).orElse(new HashSet<>());
        System.out.println("[!kafka-consumer!] Assignments [" + assignments.size() + "]: "
                + assignments.stream().map(v -> String.valueOf(v.partition())).collect(Collectors.joining(",")));
        if (assignments.size() > 0) {
            break;
        }
        try {
            Thread.sleep(1_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            needStop.set(true);
            break;
        }
    }
    System.out.println("Subscribed! Wait summary: " + (System.currentTimeMillis() - startTime));
}

private void readAvro(String bootstrapServers,
                      String readFromKafka,
                      AtomicBoolean needStop,
                      List<Event> events,
                      int readTimeout,
                      AtomicReference<Set<TopicPartition>> assignment) {

    KafkaConsumer<String, byte[]> consumer = (KafkaConsumer<String, byte[]>) queueKafkaConsumer(bootstrapServers, "latest");
    System.out.println("Subscribed to topic: " + readFromKafka);
    consumer.subscribe(Collections.singletonList(readFromKafka));

    long started = System.currentTimeMillis();
    while (!needStop.get()) {
        assignment.set(consumer.assignment());
        ConsumerRecords<String, byte[]> records = consumer.poll(1_000);
        events.addAll(CommonUtils4Tst.readEvents(records));

        if (readTimeout == -1) {
            if (events.size() > 0) {
                break;
            }
        } else if (System.currentTimeMillis() - started > readTimeout) {
            break;
        }
    }

    needStop.set(true);

    synchronized (MainTest.class) {
        MainTest.class.notifyAll();
    }
    consumer.close();
}

P.S.
needStop - global flag, to stop all running thread if any in case of failure of success
events - list of object, that i want to check
readTimeout - how much time we will wait until read all data, if readTimeout == -1, then stop when we read anything

like image 33
Alexey Alexeenka Avatar answered Sep 18 '22 13:09

Alexey Alexeenka