Is there any way to produce a message in the kafka-console-producer with a null value (ie. mark it for the compactor to delete it with a tombstone)?
I've tried producing "mykey" and "mykey|". The former produces an error and the later makes the value the empty string. Running producer like this:
$KAFKA_HOME/bin/kafka-console-producer --broker-list localhost:9092 --topic mytopic --property "parse.key=true" --property "key.separator=|"
The messages always have a key-value structure; a key or value can be null.
Kafka Streams makes this possible through the usage of tombstone records, which are records that contain a non-null key, and a null value. When Kafka Streams sees a tombstone record, it deletes the corresponding key from the state store, thus freeing up space.
Step1: Start the zookeeper as well as the kafka server. Step2: Type the command: 'kafka-console-producer' on the command line. This will help the user to read the data from the standard inputs and write it to the Kafka topic.
You can modify Kafka topic properties by navigating to the Topics page, and editing content in the Configs tab of the topic Profile.
Have a look at kafkacat (the netcat of kafka). Quoting the doc:
Produce a tombstone (a "delete" for compacted topics) for key "abc" by providing an empty message value which -Z interpretes as NULL:
echo "abc:" | kafkacat -b mybroker -t mytopic -Z -K:
Unfortunately, there is no way to do that using console-producer
this is a code snippet from ConsoleProducer class (how it reads the data). Kafka 0.11.0 (don't think that it was changed significantly between different versions).
override def readMessage() = {
lineNumber += 1
print(">")
(reader.readLine(), parseKey) match {
case (null, _) => null
case (line, true) =>
line.indexOf(keySeparator) match {
case -1 =>
if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
else throw new KafkaException(s"No key found on line $lineNumber: $line")
case n =>
val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
}
case (line, false) =>
new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
}
}
as you can see, the value is always an non-nullable array of bytes
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With