Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka MirrorMaker2 - not mirroring consumer group offsets

I have setup MirrorMaker2 for replicating data between 2 DC's.

My mm2.properties,

# mm2.properties
name=source->dest
clusters=source, dest

source.bootstrap.servers=localhost:9091
dest.bootstrap.servers=localhost:9092

source->dest.enabled=true

offset.storage.partitions=2
config.storage.replication.factor=1
status.storage.replication.factor=1

Seeing the below on MM2 startup.

[2020-02-16 07:31:07,547] INFO MirrorConnectorConfig values: 
    admin.timeout.ms = 60000
    checkpoints.topic.replication.factor = 3
    config.action.reload = restart
    config.properties.blacklist = [follower\.replication\.throttled\.replicas, leader\.replication\.throttled\.replicas, message\.timestamp\.difference\.max\.ms, message\.timestamp\.type, unclean\.leader\.election\.enable, min\.insync\.replicas]
    config.property.filter.class = class org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter
    connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector
    consumer.poll.timeout.ms = 1000
    emit.checkpoints.enabled = true
    emit.checkpoints.interval.seconds = 60
    emit.heartbeats.enabled = true
    emit.heartbeats.interval.seconds = 1
    enabled = true
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    group.filter.class = class org.apache.kafka.connect.mirror.DefaultGroupFilter
    groups = [.*]
    groups.blacklist = [console-consumer-.*, connect-.*, __.*]
    header.converter = null
    heartbeats.topic.replication.factor = 3
    key.converter = null
    metric.reporters = null
    name = source->dest
    offset-syncs.topic.replication.factor = 3
    offset.lag.max = 100
    refresh.groups.enabled = true
    refresh.groups.interval.seconds = 600
    refresh.topics.enabled = true
    refresh.topics.interval.seconds = 600
    replication.factor = 2
    replication.policy.class = class org.apache.kafka.connect.mirror.DefaultReplicationPolicy
    replication.policy.separator = .
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    source.cluster.alias = source
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    sync.topic.acls.enabled = true
    sync.topic.acls.interval.seconds = 600
    sync.topic.configs.enabled = true
    sync.topic.configs.interval.seconds = 600
    target.cluster.alias = dest
    task.assigned.groups = null
    task.assigned.partitions = null
    tasks.max = 1
    topic.filter.class = class org.apache.kafka.connect.mirror.DefaultTopicFilter
    topics = [.*]
    topics.blacklist = [.*[\-\.]internal, .*\.replica, __.*]
    transforms = []
    value.converter = null
 (org.apache.kafka.connect.mirror.MirrorConnectorConfig:347)

My data is being replicated as expected. Source topic gets created in the destination cluster as source.. But, the consumer group offset is not being replicated.

Started a consumer group in the source cluster.

./kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-1 --group test-1-group

Consumed few messages and stopped it. Posted new messages in this topic and mirror maker also mirrored the data to the target cluster.

I tried to consume message from the target cluster as follows.

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic source.test-1 --group test-1-group

Since, I use the same consumer group, I was expecting my offset also to be synced and wont consume the same message which I consumed in the cluster1. But, still consume all the messages. Is there anything I am missing here.

like image 502
user1578872 Avatar asked Feb 16 '20 15:02

user1578872


People also ask

What determines Kafka consumer offset?

The answer is it depends on the offset retention period. The default retention period for message offsets in Kafka is one week (7 days). If Kafka was configured using the default, then to answer the questions above, the offset would begin at 32.

How consumer remember Kafka offset?

consumers in a consumer group load balance record processing. consumers remember offset where they left off reading. consumers groups each have their own offset per partition.

What is Kafka consumer group offset?

Kafka Consumer Offsets As we know, each message in a Kafka topic has a partition ID and an offset ID attached to it. Therefore, in order to "checkpoint" how far a consumer has been reading into a topic partition, the consumer will regularly commit the latest processed message, also known as consumer offset.

What is Kafka mirroring?

Kafka MirrorMaker is a stand-alone tool for copying data between two Apache Kafka® clusters. It is little more than a Kafka consumer and producer hooked together. Data will be read from topics in the origin cluster and written to a topic with the same name in the destination cluster.


1 Answers

there are several fundamental reasons why replicating offsets is non-trivial:

  1. kafka is an at-least-once system (ignoring the hype). this means that mirror maker, because its built on top of kafka consumers and producers that can each timeout/disconnect, will result in some degree of duplicate records being delivered to the destination. this means that offsets dont map 1:1 between source and destination. even if you were to try and use the "exactly once" support (which the MM2 KIP clearly says its not using) all it would do is skip over partially-delivered batches, but those batches would still occupy offsets at the destination
  2. if you setup mirroring long after the source topic has started expiring records, your destination topic will start at offset 0 while the source will have much higher "oldest" offsets. there has been an attempt to address this (see KIP-391) but it was never accepted
  3. in general there's no guarantee that your mirroring topology mirrors from a single source to a single destination. the linkedin topology, for example, mirrors from multiple source clusters into "aggregate" tier clusters. mapping offsets is meaningless for such topologies

looking at the MM2 KIP there's an "offset sync topic" mentioned. in your code you can use class RemoteClusterUtils to translate checkpoints between clusters:

Map<TopicPartition, Long> newOffsets = RemoteClusterUtils.translateOffsets(
   newClusterProperties, oldClusterName, consumerGroupId
);
consumer.seek(newOffsets);

this was taken out of the following presentation - https://www.slideshare.net/ConfluentInc/disaster-recovery-with-mirrormaker-20-ryanne-dolan-cloudera-kafka-summit-london-2019

alternatively, you could use the seek by timespamp API to start your consumer group on the destination to the rough time at which data was delivered to the destination (or delivered to source, if the broker settings for log append timestamps on the destination dont overwrite those times). you'd need to rewind a little for safety.

like image 80
radai Avatar answered Oct 28 '22 07:10

radai