Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka MirrorMaker's consumer not fetching all messages from topics

I am trying to setup a Kafka Mirror mechanism, but it seems the Kafka MirrorMaker's consumer from the source Kafka cluster only reads from new incoming data to the topics as soon as the mirror maker process is started, i.e. it does not read historically saved data in the topics previously.

I am using Kafka MirrorMaker class for that as:

/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.config --num.streams 2 --producer.config producer.config --whitelist=".*"

consumer.config to read from Kafka source cluster, as:

zookeeper.connect=127.0.0.1:2181

zookeeper.connection.timeout.ms=6000

group.id=kafka-mirror

and producer.config settings to produce to the new Kafka mirrored cluster:

metadata.broker.list=localhost:9093

producer.type=sync

compression.codec=none

serializer.class=kafka.serializer.DefaultEncoder

Is there a way to define the consumer of Kafka MirrorMaker to read from the beginning of the topics of my source Kafka cluster? A bit strange, because I have defined in the consumer.config settings a new consumer group (kafka-mirror), so the consumer should just read from offset 0, i.e. from beginning of topics.

Many thanks in advance!

like image 655
user2795708 Avatar asked Dec 09 '22 01:12

user2795708


2 Answers

In consumer properties, add

auto.offset.reset=earliest

This should work

like image 84
Jobin Ar Avatar answered May 11 '23 10:05

Jobin Ar


Look at the auto.offset.reset parameter from Kafka consumer configuration.

From Kafka documentation:

auto.offset.reset largest

What to do when there is no initial offset in Zookeeper or if an offset is out of range: * smallest : automatically reset the offset to the smallest offset * largest : automatically reset the offset to the largest offset * anything else: throw exception to the consumer. If this is set to largest, the consumer may lose some messages when the number of partitions, for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest

So, using smallest for auto.offset.reset should fix your problem.

like image 41
codejitsu Avatar answered May 11 '23 10:05

codejitsu