Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka partition key not working properly‏

I'm struggling with how to use the partition key mechanism properly. My logic is set the partition number as 3, then create three partition keys as "0", "1", "2", then use the partition keys to create three KeyedMessage such as

  • KeyedMessage(topic, "0", message)
  • KeyedMessage(topic, "1", message)
  • KeyedMessage(topic, "2", message)

After this, creating a producer instance to send out all the KeyedMessage.

I expecting each KeyedMessage should enter to different partitions according to the different partition keys, which means

  • KeyedMessage(topic, "0", message) go to Partition 0
  • KeyedMessage(topic, "1", message) go to Partition 1
  • KeyedMessage(topic, "2", message) go to Partition 2

I'm using Kafka-web-console to watch the topic status, but the result is not like what I'm expecting. KeyedMessage still go to partitions randomly, some times two KeyedMessage will enter the same partition even they have different partition keys.

To make my question more clear, I would like to post some Scala codes currently I have, and I'm using Kafka 0.8.2-beta, and Scala 2.10.4.

Here is the producer codes, I didn't use the custom partitioner.class :

  val props = new Properties()

  val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec

  props.put("compression.codec", codec.toString)
  props.put("producer.type", if(synchronously) "sync" else "async")
  props.put("metadata.broker.list", brokerList)
  props.put("batch.num.messages", batchSize.toString)
  props.put("message.send.max.retries", messageSendMaxRetries.toString)
  props.put("request.required.acks",requestRequiredAcks.toString)
  props.put("client.id",clientId.toString)

  val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))

  def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
     if (partition == null) {
       new KeyedMessage(topic,message)
     } else {
       new KeyedMessage(topic,partition,message)
     }
  }

  def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))

  def send(message: Array[Byte], partition: Array[Byte]): Unit = {
    try {
      producer.send(kafkaMesssage(message, partition))
    } catch {
      case e: Exception =>
        e.printStackTrace
        System.exit(1)
    }       
  }

And here is how I use the producer, create a producer instance, then use this instance to send three message. Currently I create the partition key as Integer, then convert it to Byte Arrays:

  val testMessage = UUID.randomUUID().toString
  val testTopic = "sample1"
  val groupId_1 = "testGroup"

  print("starting sample broker testing")
  val producer = new KafkaProducer(testTopic, "localhost:9092")

  val numList = List(0,1,2);
  for (a <- numList) {
    // Create a partition key as Byte Array
    var key = java.nio.ByteBuffer.allocate(4).putInt(a).array()
    //Here I give a Array[Byte] key
    //so the second "send" function of producer will be called
    producer.send(testMessage.getBytes("UTF8"), key)
  }

Not sure whether my logic is incorrect or I didn't understand the partition key mechanism correctly. Anyone could provides some sample code or explanation would be great!

like image 974
Haoming Zhang Avatar asked Dec 09 '14 07:12

Haoming Zhang


Video Answer


3 Answers

People often assume partitioning is a way to separate business data on business categories, but this is not the right angle of viewing the partition.

Partitioning influence directly these subjects:

-performance (each partition can be consumed in parallel to other partitions)

-messages order (order of messages guaranteed only on partition level)

I will give an example how we create partitions:

You have a topic, say MyMessagesToWorld

You would like to transfer this topic (all MyMessagesToWorld) to some consumer.

You "weight" the whole "mass" of MyMessagesToWorld and found, that this is 10 kg.

You have following "business" categories in "MyMessagesToWorld ":

-messages to dad (D)

-messages to mom (M)

-messages to sis (S)

-messsages to grandma (G)

-messsages to teacher (T)

-messsages to girl friend (F)

You think, who is your consumers and found that your consumers are gnomes, that able consume like 1 Kg messages in an hour each.

You can employ up to 2 such gnomes.

1 gnome needs 10 hours to consume 10 kg messages, 2 gnomes need 5 hours.

So you decide employ all available gnomes to save time.

To create 2 "channels" for these 2 gnomes you create 2 partitions of this topic on Kafka. If you invision more gnomes, create more partitions.

You have 6 business categories inside and 2 sequential independent consumers - gnomes (consumer threads).

What to do?

Kafka's approach is following:

Suppose you have 2 kafka instances in cluster. (The same example OK , if you have more instaces in cluster)

You set partition number to 2 on Kafka, example(use Kafka 0.8.2.1 as example):

You define your topic in Kafka, telling, that you have 2 partitions for that topic:

kafka-topics.sh(.bat) --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic MyMessagesToWorld 

Now the topic MyMessagesToWorld has 2 partitions: P(0) and P(1).

You chose number 2 (partitions), because you know, you have(invision) only 2 consuming gnomes.

You may add more partitions later, when more consumer gnomes will be employed.

Do not confuse Kafka consumer with such gnome.

Kafka consumer can employ N gnomes. (N parallel threads)

Now you create KEYs for your messages.

You need KEYS to distribute your messages among partitions.

Keys will be these letters of "business categories", that you defined before: D,M,S,G,T,F, you think such letters are OK to be ID.

But in general case, whatever may be used as a Key: (complex objects and byte arrays, anything...)

If you create NO partitioner, the default one will be used.

The default partitioner is a bit stupid.

It takes hashcode of each KEY and divides it by number of available partitions , the "reminder" will defind the number of partition for that key.

Example:

KEY M, hashcode=12345, partition for M = 12345 % 2 = 1

As you can imagine, with such partitioner in the best case you have 3 business categories landing in each partition.

In worse case you can have all business categories landing in 1 partition.

If you had 100000 business categories, it will statistically OK to distribute them by such algorithm.

But with only few categories, you can have not very fair distribution.

So, you can rewrite partitioner and distribute your business categories a bit wiser.

There is an example:

This partitioner distributes business categories equally between available partitions.

public class CustomPartitioner {

private static Map<String, Integer> keyDistributionTable = new HashMap<String, Integer>();
private static AtomicInteger sequence = new AtomicInteger();
private ReentrantLock lock = new ReentrantLock();

public int partition(ProducerRecord<String, Object> record, Cluster cluster) {

    String key = record.key();
    int seq = figureSeq(key);

    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());

    if (availablePartitions.size() > 0) {
        int part = seq % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
        int numPartitions = partitions.size();
        // no partitions are available, give a non-available partition
        return seq % numPartitions;
    }
}


private int figureSeq(String key) {
    int sequentualNumber = 0;
    if(keyDistributionTable.containsKey(key)){
        sequentualNumber = keyDistributionTable.get(key);
    }else{//synchronized region
        //used only for new Keys, so high waiting time for monitor expected only on start
        lock.lock();
        try{
            if(keyDistributionTable.containsKey(key)){
                sequentualNumber =  keyDistributionTable.get(key);
            }else{
                int seq = sequence.incrementAndGet();
                keyDistributionTable.put(key, seq);
                sequentualNumber =  seq;
            }
        }finally{
            lock.unlock();
        }
    }
    return sequentualNumber;
}

}

like image 140
Vladimir Nabokov Avatar answered Oct 13 '22 09:10

Vladimir Nabokov


The default partitioner looks at the key (as a byte array) and uses (% numPartitions) to convert that value to an integer value between 0 and the number of partitions-1 inclusive. The resulting integer is what determines the partition to which the message is written, not the value of the key as you're doing.

like image 43
Chris Gerken Avatar answered Oct 13 '22 10:10

Chris Gerken


Had the same issue - just switch to the ByteArrayParitioner:

props.put("partitioner.class", "kafka.producer.ByteArrayPartitioner")
like image 30
stubotnik Avatar answered Oct 13 '22 10:10

stubotnik