I'm trying to set up replication between 2 clusters but don't want want the topic names to be changed. for example if i have a topic called "some_topic" it is automatically replicated to "cluster1.some_topic", I'm pretty sure this can be done but haven't found the correct config to change this
My current config "mirrormaker2.properties"
# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
# specify any number of cluster aliases
clusters = cluster1, cluster2
# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092
# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
for reference:
Starting with Kafka 3.0.0, it is sufficient to set
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
Also, the PrefixlessReplicationPolicy in marcin-wieloch's answer https://stackoverflow.com/a/60619233/12008693 no longer works with 3.0.0 (NullPointerException).
I'm trying to set up replication between 2 clusters, but need the same topic name in both the cluster without giving an alias for the in the connect-mirror-maker.properties.
By default, replicated topics are renamed based on source cluster aliases.
Source --> Target
topic-1 --> source.topic-1
You can avoid topics being renamed by setting the following properties to blank under your connector properties file. By default, replication.policy.separator property is a period, then by setting it to blank along with the source.cluster.alias, the target topic will have the same name as the source topic.
replication.policy.separator=
source.cluster.alias=
target.cluster.alias=
To 'disable' topic prefixes and to have topic properties mirrored properly at the same time, I had to provide a customized replication policy which also overrides the topicSource
method. Otherwise non-default topic properties (e.g., "cleanup.policy=compact"
) have not been mirrored, even after restarting mirror maker.
Here is the complete procedure that worked for me:
public class PrefixlessReplicationPolicy extends DefaultReplicationPolicy {
private static final Logger log = LoggerFactory.getLogger(PrefixlessReplicationPolicy.class);
private String sourceClusterAlias;
@Override
public void configure(Map<String, ?> props) {
super.configure(props);
sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
if (sourceClusterAlias == null) {
String logMessage = String.format("Property %s not found", MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
log.error(logMessage);
throw new RuntimeException(logMessage);
}
}
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
return topic;
}
@Override
public String topicSource(String topic) {
return topic == null ? null : sourceClusterAlias;
}
@Override
public String upstreamTopic(String topic) {
return null;
}
}
${KAFKA_HOME/libs
directoryreplication.policy.class
property in ${KAFKA_HOME}/config/mm2.properties
: replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With