I am passing a long JSON String to kafka topic eg:
{
"glossary": {
"title": "example glossary",
"GlossDiv": {
"title": "S",
"GlossList": {
"GlossEntry": {
"ID": "SGML",
"SortAs": "SGML",
"GlossTerm": "Standard Generalized Markup Language",
"Acronym": "SGML",
"Abbrev": "ISO 8879:1986",
"GlossDef": {
"para": "A meta-markup language, used to create markup languages such as DocBook.",
"GlossSeeAlso": ["GML", "XML"]
},
"GlossSee": "markup"
}
}
}
}
}
and want to create stream from the kafka topic with all the fields with out specifing every field in KSQL for eg:
CREATE STREAM pageviews_original (*) WITH \
(kafka_topic='pageviews', value_format='JSON');
If you want the field names picked up automatically by KSQL, you need to use Avro. If you use Avro, the schema for the data is registered in the Confluent Schema Registry, and KSQL will retrieve it automatically when you use the topic.
If you are using JSON, you have to tell KSQL what the columns are. You can do this either in the CREATE STREAM
statement, using STRUCT
data type for nested elements.
You can kind of workaround listing all fields by declaring only the high-level fields in the CREATE STREAM
and then accessing nested elements with EXTRACTJSONFIELD
for fields that you want to use. Be aware than there is an issue in 5.0.0, which will be fixed in 5.0.1. Also you can't use this for nested arrays etc which you do have in the sample data you show.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With