I am new to Kafka, what I've understood sofar regarding the consumer is there are basically two types of implementation.
1) The High level consumer/consumer group
2) Simple Consumer
The most important part about the high level abstraction is it used when Kafka doesn't care about handling the offset while the Simple consumer provides much better control over the offset management. What confuse me is what if I want to run consumer in a multithreaded environment and also want to have control over the offset.If I use consumer group does that mean I must read from the last offset stored in zookeeper? is that the only option I have.
A consumer group is a set of consumers which cooperate to consume data from some topics. The partitions of all the topics are divided among the consumers in the group.
A consumer can be assigned to consume multiple partitions. So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.
The consumer group-id is mandatory, it plays a major role when it comes to scalable message consumption. To start a consumer group-id is mandatory.
You can't have multiple consumers that belong to the same group in one thread and you can't have multiple threads safely use the same consumer. One consumer per thread is the rule. To run multiple consumers in the same group in one application, you will need to run each in its own thread.
For the most part, the high-level consumer API does not let you control the offset directly.
When the consumer group is first created, you can tell it whether to start with the oldest or newest message that kafka has stored using the auto.offset.reset
property.
You can also control when the high-level consumer commits new offsets to zookeeper by setting auto.commit.enable
to false.
Since the high-level consumer stores the offsets in zookeeper, your app could access zookeeper directly and manipulate the offsets - but it would be outside of the high-level consumer API.
Your question was a little confusing but you can use the simple consumer in a multi-threaded environment. That's what the high-level consumer does.
In Apache Kafka 0.9 and 0.10 the consumer group management is handled entirely within the Kafka application by a Broker (for coordination) and a topic (for state storage).
When a consumer group first subscribes to a topic the setting of auto.offset.reset
determines where consumers begin to consume messages (http://kafka.apache.org/documentation.html#newconsumerconfigs)
You can register a ConsumerRebalanceListener
to receive a notification when a particular consumer is assigned topics/partitions.
Once the consumer is running, you can use seek
, seekToBeginning
and seekToEnd
to get messages from a specific offset. seek
affects the next poll
for that consumer, and is stored on the next commit (e.g. commitSync
, commitAsync
or when the auto.commit.interval elapses, if enabled.)
The consumer javadocs mention more specific situations: http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
You can combine the group management provided by Kafka with manual management of offsets via seek(..) once partitions are assigned.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With