How to insert json file data into kafka topic using kafka-console-producer? Can each json data set be stored as a message?
example-
{
"id": 1,
"first_name": "John",
"last_name": "Lindt",
"email": "[email protected]",
"gender": "Male",
"ip_address": "1.2.3.4"
}
If you use this command -
cat sampledata.json|kafka-console-producer --broker-list localhost:9092 --topic stream-test-topic
Each line is taken as a separate message.
What is the right way to do this?
Thanks!
ps-
The topic is being read by Elastic search. Sample json message file -
[{
"id": 1,
"first_name": "John",
"last_name": "Lindt",
"email": "[email protected]",
"gender": "Male",
"ip_address": "1.2.3.4"
}, {
"id": 2,
"first_name": "Peter",
"last_name": "Friz",
"email": "[email protected]",
"gender": "Male",
"ip_address": "4.5.6.7"
}, {
"id": 3,
"first_name": "Dell",
"last_name": "Chang",
"email": "[email protected]",
"gender": "Female",
"ip_address": "8.9.10.11"
}, {
"id": 4,
"first_name": "Lolita",
"last_name": "John",
"email": "[email protected]",
"gender": "Female",
"ip_address": "12.13.14.15"
}, {
"id": 5,
"first_name": "Pele",
"last_name": "Wang",
"email": "[email protected]",
"gender": "Male",
"ip_address": "16.17.18.19"
}, {
"id": 6,
"first_name": "Rene",
"last_name": "Charm",
"email": "[email protected]",
"gender": "Male",
"ip_address": "20.21.22.23"
}]
The Write JSON to a Kafka Topic Output Connector can be used to write event data, adapted and formatted as generic JSON, to an Apache Kafka Topic. For more information about getting started with Apache Kafka, see Apache Kafka Introduction.
You can insert data written to a file into Kafka and write data from a Kafka topic to the console.
If you have JSON messages in the file, you can use following way to write in the kafka topic:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic user-timeline < samplerecords.json
Kafka producers reads the messages line by line using default LineMessageReader
. Default Key and value serializers are StringSerializer
. It will not validate whether there is a proper json or not, instead consider as raw string object as publish to a kafka topic. But if you want to validate you can define below configuration in console-producer command.
key.serializer
value.serializer
Example :
kafka-console-producer --broker-list localhost:9092 --topic testTopic--property value.serializer=custom.class.serialization.JsonSerializer
At the consumer side, you can do the similar approach. Use JsonDeserializer to read the data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With