I have setup MirrorMaker2 for replicating data between 2 DC's.
My mm2.properties,
# mm2.properties
name=source->dest
clusters=source, dest
source.bootstrap.servers=localhost:9091
dest.bootstrap.servers=localhost:9092
source->dest.enabled=true
offset.storage.partitions=2
config.storage.replication.factor=1
status.storage.replication.factor=1
Seeing the below on MM2 startup.
[2020-02-16 07:31:07,547] INFO MirrorConnectorConfig values: 
    admin.timeout.ms = 60000
    checkpoints.topic.replication.factor = 3
    config.action.reload = restart
    config.properties.blacklist = [follower\.replication\.throttled\.replicas, leader\.replication\.throttled\.replicas, message\.timestamp\.difference\.max\.ms, message\.timestamp\.type, unclean\.leader\.election\.enable, min\.insync\.replicas]
    config.property.filter.class = class org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter
    connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector
    consumer.poll.timeout.ms = 1000
    emit.checkpoints.enabled = true
    emit.checkpoints.interval.seconds = 60
    emit.heartbeats.enabled = true
    emit.heartbeats.interval.seconds = 1
    enabled = true
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    group.filter.class = class org.apache.kafka.connect.mirror.DefaultGroupFilter
    groups = [.*]
    groups.blacklist = [console-consumer-.*, connect-.*, __.*]
    header.converter = null
    heartbeats.topic.replication.factor = 3
    key.converter = null
    metric.reporters = null
    name = source->dest
    offset-syncs.topic.replication.factor = 3
    offset.lag.max = 100
    refresh.groups.enabled = true
    refresh.groups.interval.seconds = 600
    refresh.topics.enabled = true
    refresh.topics.interval.seconds = 600
    replication.factor = 2
    replication.policy.class = class org.apache.kafka.connect.mirror.DefaultReplicationPolicy
    replication.policy.separator = .
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    source.cluster.alias = source
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    sync.topic.acls.enabled = true
    sync.topic.acls.interval.seconds = 600
    sync.topic.configs.enabled = true
    sync.topic.configs.interval.seconds = 600
    target.cluster.alias = dest
    task.assigned.groups = null
    task.assigned.partitions = null
    tasks.max = 1
    topic.filter.class = class org.apache.kafka.connect.mirror.DefaultTopicFilter
    topics = [.*]
    topics.blacklist = [.*[\-\.]internal, .*\.replica, __.*]
    transforms = []
    value.converter = null
 (org.apache.kafka.connect.mirror.MirrorConnectorConfig:347)
My data is being replicated as expected. Source topic gets created in the destination cluster as source.. But, the consumer group offset is not being replicated.
Started a consumer group in the source cluster.
./kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-1 --group test-1-group
Consumed few messages and stopped it. Posted new messages in this topic and mirror maker also mirrored the data to the target cluster.
I tried to consume message from the target cluster as follows.
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic source.test-1 --group test-1-group
Since, I use the same consumer group, I was expecting my offset also to be synced and wont consume the same message which I consumed in the cluster1. But, still consume all the messages. Is there anything I am missing here.
The answer is it depends on the offset retention period. The default retention period for message offsets in Kafka is one week (7 days). If Kafka was configured using the default, then to answer the questions above, the offset would begin at 32.
consumers in a consumer group load balance record processing. consumers remember offset where they left off reading. consumers groups each have their own offset per partition.
Kafka Consumer Offsets As we know, each message in a Kafka topic has a partition ID and an offset ID attached to it. Therefore, in order to "checkpoint" how far a consumer has been reading into a topic partition, the consumer will regularly commit the latest processed message, also known as consumer offset.
Kafka MirrorMaker is a stand-alone tool for copying data between two Apache Kafka® clusters. It is little more than a Kafka consumer and producer hooked together. Data will be read from topics in the origin cluster and written to a topic with the same name in the destination cluster.
there are several fundamental reasons why replicating offsets is non-trivial:
looking at the MM2 KIP there's an "offset sync topic" mentioned. in your code you can use class RemoteClusterUtils to translate checkpoints between clusters:
Map<TopicPartition, Long> newOffsets = RemoteClusterUtils.translateOffsets(
   newClusterProperties, oldClusterName, consumerGroupId
);
consumer.seek(newOffsets);
this was taken out of the following presentation - https://www.slideshare.net/ConfluentInc/disaster-recovery-with-mirrormaker-20-ryanne-dolan-cloudera-kafka-summit-london-2019
alternatively, you could use the seek by timespamp API to start your consumer group on the destination to the rough time at which data was delivered to the destination (or delivered to source, if the broker settings for log append timestamps on the destination dont overwrite those times). you'd need to rewind a little for safety.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With