Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Kafka Partitioning

What is the difference in the behavior of the below two code snippets to publish a message?

Approach 1

Message<String> message = MessageBuilder.withPayload("testmsg")
        .setHeader(KafkaHeaders.MESSAGE_KEY, "key").setHeader(KafkaHeaders.TOPIC, "test").build();

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);

Approach 2

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", "testmsg");

Topic Config:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test   PartitionCount:3    ReplicationFactor:1 Configs:
Topic: test  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic: test  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
Topic: test  Partition: 2    Leader: 0   Replicas: 0 Isr: 0

Observation:

If there are 3 consumers, one per partition; Approach 1 leads to all messages consumed by a single consumer from a single partition. With Approach 2; consumption is equally split between the 3 partitions/consumers.

like image 731
srini Avatar asked Aug 07 '17 21:08

srini


People also ask

What is partitioning in Kafka?

Kafka Partitioning Partitioning takes the single topic log and breaks it into multiple logs, each of which can live on a separate node in the Kafka cluster. This way, the work of storing messages, writing new messages, and processing existing messages can be split among many nodes in the cluster.

How do I create a Kafka topic with partitions in spring boot?

To create a Kafka topic programmatically introduce a configuration class that annotated with @Configuration : this annotation indicates that the Java class can be used by Spring as a source of bean definitions. Next to the name of the Kafka topic name you can specify: the number of partitions for the topic.

How many partitions should a Kafka topic have?

For most implementations you want to follow the rule of thumb of 10 partitions per topic, and 10,000 partitions per Kafka cluster. Going beyond that amount can require additional monitoring and optimization. (You can learn more about Kafka monitoring here.)

How are Kafka partitions assigned?

Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log. The messages in the partitions are each assigned a sequential id number called the offset that uniquely identifies each message within the partition.


1 Answers

But you have an answer in your code. The first one alongside with the topic provides messageKey.

The messageKey is really used to determine target partition if isn't specified explicitly:

/**
 * computes partition for given record.
 * if the record has partition returns the value otherwise
 * calls configured partitioner class to compute the partition.
 */
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

where DefaultPartitioner does this:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
    int nextValue = nextValue(topic);
        ...
} else {
   // hash the keyBytes to choose a partition
   return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

So, all messages with the same key are sent to the same partition. Otherwise they are placed to the topic round-robin manner.

like image 147
Artem Bilan Avatar answered Oct 14 '22 04:10

Artem Bilan