Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka consumer - assign and seek

I could not understand this API design!

In the below code, we subscribe to a list of topics with dynamically assigned partition. This is totally fine.

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Arrays.asList("some-topic"));

    while(true){

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        StreamSupport.stream(records.spliterator(), false)
                    .forEach(r -> {
                        System.out.println(r.key() + "::" + r.value());
                    });


    }

Confusion is here.

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    //seek for specific partition
    TopicPartition partition = new TopicPartition("some-topic", 0);
    consumer.assign(Arrays.asList(partition));
    consumer.seek(partition, 0);

    while(true){

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        StreamSupport.stream(records.spliterator(), false)
                .forEach(r -> {
                    System.out.println(r.key() + "::" + r.value());
                });


    }

Question:

  1. We have already assigned list of partitions using the assign method. Why does the seek method also look for partition info? Somehow I feel like it is redundant.
  2. seek method has the partition with topic and offset. Why does it require a assign first to call a seek?
like image 423
KitKarson Avatar asked Dec 08 '25 09:12

KitKarson


1 Answers

1 - first keep in mind that your consumer can be assigned to many different topics / partitions through kafka API. Then, seek and assign have two different independent responsibilities, that's why you might think it's redundant, but whenever you need to go back to an offset, or for whatever reason you need to redo an offset, you'll use seek, and for this, seek() need topic and partition info, either you used static assignment ( assign) or dynamic ( subscribe).

You cannot just use seek () without specifying a topic)/ partition, it would be ambiguous in many cases.

2 - Are you sure you need to do assign before calling seek ()? I know both can be used before calling poll().., but did not know assign was mandatory before a seek().., do you have an error message ( I might check anyway tomorrow and edit this post)

Yannick

like image 88
Yannick Avatar answered Dec 10 '25 22:12

Yannick



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!