Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to migrate consumer offsets using MirrorMaker 2.0?

With Kafka 2.7.0, I am using MirroMaker 2.0 as a Kafka-connect connector to replicate all the topics from the primary Kafka cluster to the backup cluster.

All the topics are being replicated perfectly except __consumer_offsets. Below are the connect configurations:

{
    "name": "test-connector",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "topics.blacklist": "some-random-topic",
      "replication.policy.separator": "",
      "source.cluster.alias": "",
      "target.cluster.alias": "",
      "exclude.internal.topics":"false",
      "tasks.max": "10",
      "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094",
      "target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094",
      "topics": "test-topic-from-primary,primary-kafka-connect-offset,primary-kafka-connect-config,primary-kafka-connect-status,__consumer_offsets"
    }
}

In a similar question here, the accepted answer says the following:

Add this in your consumer.config:

exclude.internal.topics=false

And add this in your producer.config:

client.id=__admin_client

Where do I add these in my configuration?

Here the Connector Configuration Properties does not have such property named client.id, I have set the value of exclude.internal.topics to false though.

Is there something I am missing here?

UPDATE

I learned that Kafka 2.7 and above supports automated consumer offset sync using MirrorCheckpointTask as mentioned here.

I have created a connector for this having the below configurations:

{
    "name": "mirror-checkpoint-connector",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
      "sync.group.offsets.enabled": "true",
      "source.cluster.alias": "",
      "target.cluster.alias": "",
      "exclude.internal.topics":"false",
      "tasks.max": "10",
      "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094",
      "target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094",
      "topics": "__consumer_offsets"
    }
}

Still no help. Is this the correct approach? Is there something needed?

like image 569
Amit Yadav Avatar asked Sep 02 '25 17:09

Amit Yadav


1 Answers

you do not want to replicate connsumer_offsets. The offsets from the src to the destination cluster will not be the same for various reasons.

MirrorMaker2 provides the ability to do offset translation. It will populate the destination cluster with a translated offset generated from the src cluster. https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0

like image 147
Mitchell H Avatar answered Sep 04 '25 06:09

Mitchell H