What is the difference in the behavior of the below two code snippets to publish a message?
Approach 1
Message<String> message = MessageBuilder.withPayload("testmsg")
.setHeader(KafkaHeaders.MESSAGE_KEY, "key").setHeader(KafkaHeaders.TOPIC, "test").build();
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(message);
Approach 2
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", "testmsg");
Topic Config:
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Observation:
If there are 3 consumers, one per partition; Approach 1 leads to all messages consumed by a single consumer from a single partition. With Approach 2; consumption is equally split between the 3 partitions/consumers.
Kafka Partitioning Partitioning takes the single topic log and breaks it into multiple logs, each of which can live on a separate node in the Kafka cluster. This way, the work of storing messages, writing new messages, and processing existing messages can be split among many nodes in the cluster.
To create a Kafka topic programmatically introduce a configuration class that annotated with @Configuration : this annotation indicates that the Java class can be used by Spring as a source of bean definitions. Next to the name of the Kafka topic name you can specify: the number of partitions for the topic.
For most implementations you want to follow the rule of thumb of 10 partitions per topic, and 10,000 partitions per Kafka cluster. Going beyond that amount can require additional monitoring and optimization. (You can learn more about Kafka monitoring here.)
Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log. The messages in the partitions are each assigned a sequential id number called the offset that uniquely identifies each message within the partition.
But you have an answer in your code.
The first one alongside with the topic
provides messageKey
.
The messageKey
is really used to determine target partition if isn't specified explicitly:
/**
* computes partition for given record.
* if the record has partition returns the value otherwise
* calls configured partitioner class to compute the partition.
*/
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
where DefaultPartitioner
does this:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
...
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
So, all messages with the same key
are sent to the same partition. Otherwise they are placed to the topic round-robin manner.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With