Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Producing a Kafka message with a Null Value (Tombstone) from the Console

Is there any way to produce a message in the kafka-console-producer with a null value (ie. mark it for the compactor to delete it with a tombstone)?

I've tried producing "mykey" and "mykey|". The former produces an error and the later makes the value the empty string. Running producer like this:

$KAFKA_HOME/bin/kafka-console-producer --broker-list localhost:9092 --topic mytopic --property "parse.key=true" --property "key.separator=|"
like image 740
David Seapy Avatar asked Aug 28 '18 12:08

David Seapy


People also ask

Can Kafka message be null?

The messages always have a key-value structure; a key or value can be null.

What is a tombstone Kafka?

Kafka Streams makes this possible through the usage of tombstone records, which are records that contain a non-null key, and a null value. When Kafka Streams sees a tombstone record, it deletes the corresponding key from the state store, thus freeing up space.

How do I send messages to Kafka topic manually?

Step1: Start the zookeeper as well as the kafka server. Step2: Type the command: 'kafka-console-producer' on the command line. This will help the user to read the data from the standard inputs and write it to the Kafka topic.

Can Kafka message be modified?

You can modify Kafka topic properties by navigating to the Topics page, and editing content in the Configs tab of the topic Profile.


2 Answers

Have a look at kafkacat (the netcat of kafka). Quoting the doc:

Produce a tombstone (a "delete" for compacted topics) for key "abc" by providing an empty message value which -Z interpretes as NULL:

echo "abc:" | kafkacat -b mybroker -t mytopic -Z -K:
like image 52
mathieu Avatar answered Sep 22 '22 14:09

mathieu


Unfortunately, there is no way to do that using console-producer

this is a code snippet from ConsoleProducer class (how it reads the data). Kafka 0.11.0 (don't think that it was changed significantly between different versions).

override def readMessage() = {
  lineNumber += 1
  print(">")
  (reader.readLine(), parseKey) match {
    case (null, _) => null
    case (line, true) =>
      line.indexOf(keySeparator) match {
        case -1 =>
          if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
          else throw new KafkaException(s"No key found on line $lineNumber: $line")
        case n =>
          val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
          new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
      }
    case (line, false) =>
      new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
  }
}

as you can see, the value is always an non-nullable array of bytes

like image 22
Natalia Avatar answered Sep 21 '22 14:09

Natalia