Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to replicate kafka topics without alias prefix with MirrorMaker2

I'm trying to set up replication between 2 clusters but don't want want the topic names to be changed. for example if i have a topic called "some_topic" it is automatically replicated to "cluster1.some_topic", I'm pretty sure this can be done but haven't found the correct config to change this

My current config "mirrormaker2.properties"

# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties 

# specify any number of cluster aliases
clusters = cluster1, cluster2

# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092

# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*

# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5

for reference:

  • https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-RunningastandaloneMirrorMakerconnector
  • https://kafka.apache.org/24/javadoc/index.html?constant-values.html
like image 758
Brendan Scullion Avatar asked Dec 18 '19 10:12

Brendan Scullion


3 Answers

Starting with Kafka 3.0.0, it is sufficient to set

replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy

Also, the PrefixlessReplicationPolicy in marcin-wieloch's answer https://stackoverflow.com/a/60619233/12008693 no longer works with 3.0.0 (NullPointerException).

like image 62
Gunther Vogel Avatar answered Sep 16 '22 15:09

Gunther Vogel


I'm trying to set up replication between 2 clusters, but need the same topic name in both the cluster without giving an alias for the in the connect-mirror-maker.properties.

By default, replicated topics are renamed based on source cluster aliases.

    Source --> Target
    topic-1 --> source.topic-1

You can avoid topics being renamed by setting the following properties to blank under your connector properties file. By default, replication.policy.separator property is a period, then by setting it to blank along with the source.cluster.alias, the target topic will have the same name as the source topic.

replication.policy.separator=
source.cluster.alias=
target.cluster.alias=

like image 35
Rohan Tiwari Avatar answered Sep 18 '22 15:09

Rohan Tiwari


To 'disable' topic prefixes and to have topic properties mirrored properly at the same time, I had to provide a customized replication policy which also overrides the topicSource method. Otherwise non-default topic properties (e.g., "cleanup.policy=compact") have not been mirrored, even after restarting mirror maker.

Here is the complete procedure that worked for me:

  1. Compile and package the following customized replication policy into a .jar file (full source code can be found here):
public class PrefixlessReplicationPolicy extends DefaultReplicationPolicy {

  private static final Logger log = LoggerFactory.getLogger(PrefixlessReplicationPolicy.class);

  private String sourceClusterAlias;

  @Override
  public void configure(Map<String, ?> props) {
    super.configure(props);
    sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
    if (sourceClusterAlias == null) {
      String logMessage = String.format("Property %s not found", MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
      log.error(logMessage);
      throw new RuntimeException(logMessage);
    }
  }

  @Override
  public String formatRemoteTopic(String sourceClusterAlias, String topic) {
    return topic;
  }

  @Override
  public String topicSource(String topic) {
    return topic == null ? null : sourceClusterAlias;
  }

  @Override
  public String upstreamTopic(String topic) {
    return null;
  }
}
  1. Copy the .jar into the ${KAFKA_HOME/libs directory
  2. Configure Mirror Maker 2 to use that replication policy by setting the replication.policy.class property in ${KAFKA_HOME}/config/mm2.properties:
  replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy
like image 31
Marcin Wieloch Avatar answered Sep 19 '22 15:09

Marcin Wieloch