Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

No current assignment for partition occurs even after poll in Kafka

I have Java 8 application working with Apache Kafka 2.11-0.10.1.0. I need to use the seek feature to poll old messages from partitions. However I faced an exception of No current assignment for partition which is occurred every time I am trying to seekByOffset. Here's my class which is responsible for seeking topics to the specified timestamp:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
 * The main purpose of this class is to move fetching point for each partition of the {@link KafkaConsumer}
 * to some offset which is determined either by timestamp or by offset number.
 */
public class KafkaSeeker {
    public static final long APP_STARTUP_TIME = Instant.now().toEpochMilli();

    private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
    private final KafkaConsumer<String, String> kafkaConsumer;
    private ConsumerRecords<String, String> polledRecords;

    public KafkaSeeker(KafkaConsumer<String, String> kafkaConsumer) {
        this.kafkaConsumer = kafkaConsumer;
        this.polledRecords = new ConsumerRecords<>(Collections.emptyMap());
    }

    /**
     * For each assigned or subscribed topic {@link org.apache.kafka.clients.consumer.KafkaConsumer#seek(TopicPartition, long)}
     * fetching pointer to the specified {@code timestamp}.
     * If no messages were found in each partition for a topic,
     * then {@link org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd(Collection)} will be called.
     *
     * Due to {@link KafkaConsumer#subscribe(Pattern)} and {@link KafkaConsumer#assign(Collection)} laziness
     * method needs to execute dummy {@link KafkaConsumer#poll(long)} method. All {@link ConsumerRecords} which were
     * polled from buffer are swallowed and produce warning logs.
     *
     * @param timestamp is used to find proper offset to seek to
     * @param topics are used to seek only specific topics. If not specified or empty, all subscribed topics are used.
     */
    public Map<TopicPartition, OffsetAndTimestamp> seek(long timestamp, Collection<String> topics) {
        this.polledRecords = kafkaConsumer.poll(0);
        Collection<TopicPartition> topicPartitions;
        if (CollectionUtils.isEmpty(topics)) {
            topicPartitions = kafkaConsumer.assignment();
        } else {
            topicPartitions = topics.stream()
                    .map(it -> {
                        List<Integer> partitions = kafkaConsumer.partitionsFor(it).stream()
                                .map(PartitionInfo::partition).collect(Collectors.toList());
                        return partitions.stream().map(partition -> new TopicPartition(it, partition));
                    })
                    .flatMap(it -> it)
                    .collect(Collectors.toList());
        }

        if (topicPartitions.isEmpty()) {
            throw new IllegalStateException("Kafka consumer doesn't have any subscribed topics.");
        }

        Map<TopicPartition, Long> timestampsByTopicPartitions = topicPartitions.stream()
                .collect(Collectors.toMap(Function.identity(), topicPartition -> timestamp));
        Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
        Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampsByTopicPartitions);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            if (entry.getValue() != null) {
                LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to [{} offset].",
                        topicPartition.topic(),
                        topicPartition.partition(),
                        beginningOffsets.get(topicPartition),
                        entry.getValue());
                kafkaConsumer.seek(topicPartition, entry.getValue().offset());
            } else {
                LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to the end of partition.",
                        topicPartition.topic(),
                        topicPartition.partition());
                kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
            }
        }
        return offsets;
    }

    public ConsumerRecords<String, String> getPolledRecords() {
        return polledRecords;
    }
}

Before calling the method I have consumer subscribed to a single topic like this consumer.subscribe(singletonList(kafkaTopic));. When I get kafkaConsumer.assignment() it returns zero TopicPartitions assigned. But if I specify the topic and get its partitions then I have valid TopicPartitions, although they are failing on seek call with the error in the title. What is something I forgot?

like image 367
Praytic Avatar asked Feb 01 '19 13:02

Praytic


People also ask

How does Kafka assign messages to partitions?

The messages in the partitions are each assigned a sequential id number called the offset that uniquely identifies each message within the partition. The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time.

What happens when you call poll on Kafka consumer?

The Kafka consumer poll() method fetches records in sequential order from a specified topic/partitions. This poll() method is how Kafka clients read data from Kafka. When the poll() method is called, the consumer will fetch records from the last consumed offset.

How does Kafka producer decide partition?

Allowing Kafka to decide the partitionIf a producer doesn't specify a partition key when producing a record, Kafka will use a round-robin partition assignment. Those records will be written evenly across all partitions of a particular topic.

Which method of Kafka consumer class is used to manually assign a list of partitions to a consumer?

The assign method manually assign a list of partitions to this consumer. And if the given list of topic partitions is empty, it is treated the same as unsubscribe(). Manual topic assignment through this method does not use the consumer's group management functionality.


2 Answers

The correct way to reliably seek and check current assignment is to wait for the onPartitionsAssigned() callback after subscribing. On a newly created (still not connected) consumer, calling poll() once does not guarantees it will immedaitely be connected and assigned partitions.

As a basic example, see the code below that subscribes to a topic, and in the assigned callback, seeks to the desired position. Finally you'll notice that the poll loop correctly only sees records from the seek location and not from the previous committed or reset offset.

public static final Map<TopicPartition, Long> offsets = Map.of(new TopicPartition("testtopic", 0), 5L);

public static void main(String args[]) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {

        consumer.subscribe(Collections.singletonList("testtopic"), new ConsumerRebalanceListener() {

            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println("Assigned " + partitions);
                for (TopicPartition tp : partitions) {
                    OffsetAndMetadata oam = consumer.committed(tp);
                    if (oam != null) {
                        System.out.println("Current offset is " + oam.offset());
                    } else {
                        System.out.println("No committed offsets");
                    }
                    Long offset = offsets.get(tp);
                    if (offset != null) {
                        System.out.println("Seeking to " + offset);
                        consumer.seek(tp, offset);
                    }
                }
            }
        });

        for (int i = 0; i < 10; i++) {
            System.out.println("Calling poll");
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100L));
            for (ConsumerRecord<String, String> r : records) {
                System.out.println("record from " + r.topic() + "-" + r.partition() + " at offset " + r.offset());
            }
        }
    }
}
like image 117
Mickael Maison Avatar answered Sep 29 '22 12:09

Mickael Maison


KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
// Get topic partitions
List<TopicPartition> partitions = consumer
                    .partitionsFor(topic)
                    .stream()
                    .map(partitionInfo ->
                            new TopicPartition(topic, partitionInfo.partition()))
                    .collect(Collectors.toList());
// Explicitly assign the partitions to our consumer
consumer.assign(partitions);
//seek, query offsets, or poll

Please note that this disables consumer group management and rebalancing operations. When possible use @Mickael Maison's approach.

like image 45
JMess Avatar answered Sep 29 '22 12:09

JMess