I am using a custom Kafka connector (written in Java, using Kafka Connect's Java API) to pull data from an outside source and store in a topic. I need to set a custom partitioning strategy. I understand that setting a custom partitioner is possible in a Kafka Producer by setting the partitioner.class
property. However, this property doesn't appear to do anything for a Kafka connector. How do I configure Kafka Connect (I'm using the connect-standalone
script to run my connector) to use a custom Partitioner
that I write?
The source connector can control the partition to which each source record is written via the SourceRecord
's partition
field. This is the most straightforward, if this is your own connector.
However, if you want to change how a source connector partitions each record, you can use a Single Message Transform (SMT) that overwrites the partition
field of the source records. You'll likely have to write a custom SMT by implementing org.apache.kafka.connect.transforms.Transformation
and using your own partitioning logic, but that's actually a bit easier than writing a custom Kafka partitioner.
For example, here's a notional custom transformation that shows how to use configuration properties and how to create a new SourceRecord
instance with the desired partition number. The sample is incomplete as it doesn't really have any true partitioning logic, but it should be a good starting point.
package io.acme.example; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.transforms.Transformation; import java.util.Map; public class CustomPartitioner implements Transformation { private static final String MAX_PARTITIONS_CONFIG = "max.partitions"; private static final String MAX_PARTITIONS_DOC = "The maximum number of partitions"; private static final int MAX_PARTITIONS_DEFAULT = 1; /** * The definition of the configurations. We just define a single configuration property here, * but you can chain multiple "define" methods together. Complex configurations may warrant * pulling all the config-related things into a separate class that extends {@link AbstractConfig} * and adds helper methods (e.g., "getMaxPartitions()"), and you'd use this class to parse the * parameters in {@link #configure(Map)} rather than {@link AbstractConfig}. */ private static final ConfigDef CONFIG_DEF = new ConfigDef().define(MAX_PARTITIONS_CONFIG, Type.INT, MAX_PARTITIONS_DEFAULT, Importance.HIGH, MAX_PARTITIONS_DOC); private int maxPartitions; @Override public void configure(Map configs) { // store any configuration parameters as fields ... AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs); maxPartitions = config.getInt(MAX_PARTITIONS_CONFIG); } @Override public SourceRecord apply(SourceRecord record) { // Compute the desired partition here int actualPartition = record.kafkaPartition(); int desiredPartition = ... // Then create the new record with all of the existing fields except with the new partition ... return record.newRecord(record.topic(), desiredPartition, record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp()); } @Override public ConfigDef config() { return CONFIG_DEF; } @Override public void close() { // do nothing } }
The ConfigDef
and AbstractConfig
functionality is pretty useful and can do a lot more interesting things, including using custom validators and recommenders, as well as having configuration properties that are dependent on other properties. If you want to learn more about this, check out some of the existing Kafka Connect connectors that also use this same framework.
One final thing. When running the Kafka Connect standalone or distributed workers, but sure to set the CLASSPATH environment variable to point to the JAR file containing your custom SMT plus any JAR files your SMT depends on except those provided by Kafka. The connect-standalone.sh
and connect-distributed.sh
commands will add the Kafka JARs to classpath automatically.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With