I'm trying to find out which offsets my current High-Level consumers are working off. I use Kafka 0.8.2.1, with no "offset.storage" set in the server.properties of Kafka - which, I think, means that offsets are stored in Kafka. (I also verified that no offsets are stored in Zookeeper by checking this path in the Zk shell: /consumers/consumer_group_name/offsets/topic_name/partition_number
)
I tried to listen to the __consumer_offsets
topic to see which consumer saves what value of offsets, but it did not work...
I tried the following:
created a config file for console consumer as following:
=> more kafka_offset_consumer.config exclude.internal.topics=false
and tried two versions of the console consumer scripts:
#1: bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181 #2 ./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config
Neither worked - it just sits there but does not print anything, even though the consumers are actively consuming/saving offsets.
Am I missing some other configuration/properties ?
thanks!
Marina
__consumer_offsets is the topic where Apache Kafka stores the offsets. Since the time Kafka migrated the offset storage from Zookeeper to avoid scalability problems __consumer_offsets is the one topic took the center stage in managing the offsets for all the consumers.
If you want to process a topic from its beginning, you can simple start a new consumer group (i.e., choose an unused group.id ) and set auto. offset. reset = earliest . Because there are no committed offsets for a new group, auto offset reset will trigger and the topic will be consumed from its beginning.
I came across this question when trying to also consume from the __consumer_offsets topic. I managed to figure it out for different Kafka versions and thought I'd share what I'd found
For Kafka 0.8.2.x
Note: This uses Zookeeper connection
#Create consumer config echo "exclude.internal.topics=false" > /tmp/consumer.config #Consume all offsets ./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \ --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \ --zookeeper localhost:2181 --topic __consumer_offsets --from-beginning
For Kafka 0.9.x.x and 0.10.x.x
#Create consumer config echo "exclude.internal.topics=false" > /tmp/consumer.config #Consume all offsets ./kafka-console-consumer.sh --new-consumer --consumer.config /tmp/consumer.config \ --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \ --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
For 0.11.x.x - 2.x
#Create consumer config echo "exclude.internal.topics=false" > /tmp/consumer.config #Consume all offsets ./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \ --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \ --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With