Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Setting Partition Strategy in a Kafka Connector

I am using a custom Kafka connector (written in Java, using Kafka Connect's Java API) to pull data from an outside source and store in a topic. I need to set a custom partitioning strategy. I understand that setting a custom partitioner is possible in a Kafka Producer by setting the partitioner.class property. However, this property doesn't appear to do anything for a Kafka connector. How do I configure Kafka Connect (I'm using the connect-standalone script to run my connector) to use a custom Partitioner that I write?

like image 635
Raisin Bran Dan Avatar asked Jun 28 '17 18:06

Raisin Bran Dan


1 Answers

The source connector can control the partition to which each source record is written via the SourceRecord's partition field. This is the most straightforward, if this is your own connector.

However, if you want to change how a source connector partitions each record, you can use a Single Message Transform (SMT) that overwrites the partition field of the source records. You'll likely have to write a custom SMT by implementing org.apache.kafka.connect.transforms.Transformation and using your own partitioning logic, but that's actually a bit easier than writing a custom Kafka partitioner.

For example, here's a notional custom transformation that shows how to use configuration properties and how to create a new SourceRecord instance with the desired partition number. The sample is incomplete as it doesn't really have any true partitioning logic, but it should be a good starting point.

package io.acme.example;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;

import java.util.Map;

public class CustomPartitioner implements Transformation {

  private static final String MAX_PARTITIONS_CONFIG = "max.partitions";
  private static final String MAX_PARTITIONS_DOC = "The maximum number of partitions";
  private static final int MAX_PARTITIONS_DEFAULT = 1;

  /** 
   * The definition of the configurations. We just define a single configuration property here,
   * but you can chain multiple "define" methods together. Complex configurations may warrant
   * pulling all the config-related things into a separate class that extends {@link AbstractConfig}
   * and adds helper methods (e.g., "getMaxPartitions()"), and you'd use this class to parse the
   * parameters in {@link #configure(Map)} rather than {@link AbstractConfig}.
   */
  private static final ConfigDef CONFIG_DEF = new ConfigDef().define(MAX_PARTITIONS_CONFIG, Type.INT, MAX_PARTITIONS_DEFAULT, Importance.HIGH, MAX_PARTITIONS_DOC);

  private int maxPartitions;

  @Override
  public void configure(Map configs) {
    // store any configuration parameters as fields ...
    AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs);
    maxPartitions = config.getInt(MAX_PARTITIONS_CONFIG);
  }

  @Override
  public SourceRecord apply(SourceRecord record) {
    // Compute the desired partition here
    int actualPartition = record.kafkaPartition();
    int desiredPartition = ...
    // Then create the new record with all of the existing fields except with the new partition ...
    return record.newRecord(record.topic(), desiredPartition,
                            record.keySchema(), record.key(),
                            record.valueSchema(), record.value(),
                            record.timestamp());
  }

  @Override
  public ConfigDef config() {
    return CONFIG_DEF;
  }

  @Override
  public void close() {
    // do nothing
  }
}

The ConfigDef and AbstractConfig functionality is pretty useful and can do a lot more interesting things, including using custom validators and recommenders, as well as having configuration properties that are dependent on other properties. If you want to learn more about this, check out some of the existing Kafka Connect connectors that also use this same framework.

One final thing. When running the Kafka Connect standalone or distributed workers, but sure to set the CLASSPATH environment variable to point to the JAR file containing your custom SMT plus any JAR files your SMT depends on except those provided by Kafka. The connect-standalone.sh and connect-distributed.sh commands will add the Kafka JARs to classpath automatically.

like image 176
Randall Hauch Avatar answered Sep 24 '22 22:09

Randall Hauch