Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka consumer seek is not working: AssertionError: Unassigned partition

the kafka consumer con defined below works perfectly fine when I try to receive messages form my topic; however, it's giving me trouble when I try to change the offset using seek method or any of its variations. i.e. seek_to_beginning, seek_to_end

from kafka import KafkaConsumer, TopicPartition

con = KafkaConsumer(my_topic, bootstrap_servers = my_bootstrapservers, group_id = my_groupid)
p = con.partitions_for_topic(my_topic)
my_partition = p.pop()
tp = TopicPartition(topic = my_topic, partition = my_partition)
print ('*** tp: ', tp)
con.seek_to_beginning(tp)

it generates the following output and the error below:

*** tp:  TopicPartition(topic='mytopic', partition=0)
File "/myhome/anaconda3/lib/python3.6/site-packages/kafka/consumer/group.py", line 735, in seek_to_beginning
assert p in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition

The 'Unassigned partition' error does not look valid to me because I just get the partition and the TopicPartition from the consumer itself and pass it back into the seek method so it is definitely assigned. any ideas?

like image 822
Zahra Avatar asked Mar 09 '23 11:03

Zahra


1 Answers

I was able to figure out a way to seek on my topic partition by changing how the partition is assigned to my consumer. The issue seems to be that the consumer could not work with the topic partition that was subscribed to by default so I had to change the constructor to prevent subscribing to the topic by default and then explicitly assign a partition to my consumer.

from kafka import KafkaConsumer, TopicPartition

con = KafkaConsumer(bootstrap_servers = my_bootstrapservers)
tp = TopicPartition(my_topic, 0)
con.assign([tp])
con.seek_to_beginning()
con.seek(tp, 1000000)
like image 138
Zahra Avatar answered May 04 '23 21:05

Zahra