I am new with Kafka Java API and I am working on consuming records from a particular Kafka topic.
I understand that I can use method subscribe()
to start polling records from the topic. Kafka also provides method assign()
if I want to start polling records from selected partitions of the topics.
I want to understand if this is the only difference between the two?
An assignment essentially functions as an exclusive lock on a given set of partitions. Each partition is assigned to exactly one consumer per group, and only the consumer that owns that partition will be able to read its data while the assignment persists. Each consumer can be assigned many partitions, however.
Kafka allows specifying the position using seek(TopicPartition, long) to specify the new position. Special methods for seeking to the earliest and latest offset the server maintains are also available ( seekToBeginning(Collection) and seekToEnd(Collection) respectively).
Yes subscribe
need group.id
because each consumer in a group will dynamically assigned to partitions for list of topics provided in subscribe method and each partition can be consumed by one consumer thread in that group. This is achieved by balancing the partitions between all members in the consumer group so that each partition is assigned to exactly one consumer in the group
assign
will manually assign a list of partitions to this consumer. and this method does not use the consumer's group management functionality (where no need of group.id
)
The main difference is assign(Collection)
will loose the controller over dynamic partition assignment and consumer group coordination
It is also possible for the consumer to manually assign specific partitions (similar to the older "simple" consumer) using assign(Collection). In this case, dynamic partition assignment and consumer group coordination will be disabled.
subscribe
public void subscribe(java.util.Collection<java.lang.String> topics)
The subscribe method Subscribe to the given list of topics to get dynamically assigned partitions. and if the given list of topics is empty, it is treated the same as unsubscribe().
As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and will trigger a rebalance operation if one of the following events trigger -
Number of partitions change for any of the subscribed list of topics
Topic is created or deleted
An existing member of the consumer group dies
A new member is added to an existing consumer group via the join API
assign
public void assign(java.util.Collection<TopicPartition> partitions)
The assign method manually assign a list of partitions to this consumer. And if the given list of topic partitions is empty, it is treated the same as unsubscribe().
Manual topic assignment through this method does not use the consumer's group management functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With