This is how I produce message :
String json = gson.toJson(msg);
ProducerRecord<String, String> record = new ProducerRecord<>(kafkaProducerConfig.getTopic(), json);
long startTime = System.currentTimeMillis();
try {
RecordMetadata meta = producer.send(record).get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
I have 15
partitions for this topic, I did not mention the partition key while producing, what will be the default partition assigned ?
Since you're sending no key as part of the record, it is null.
Kafka has a DefaultPartitioner that will round-robin any null keys over each partition.
For non-null keys, a Murmur2 hash is computed, then modulo'd by the number of partitions for the topic
If you are not defined any custom partition it will use the default partitioner as per the below rule
Below default, Partition implementation to get a better understanding
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With