Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating a KSQL Stream: How to extract value from complex json

I am trying to create a stream in Apache/KAFKA KSQL The topic contains (somewhat complex JSON)

{
  "agreement_id": "dd8afdbe-59cf-4272-b640-b14a24d8234c",
  "created_at": "2018-02-17 16:00:00.000Z",
  "id": "6db276a8-2efe-4495-9908-4d3fc4cc16fa",
  "event_type": "data",
  "total_charged_amount": {
    "tax_free_amount": null,
    "tax_amounts": [],
    "tax_included_amount": {
      "amount": 0.0241,
      "currency": "EUR"
    }
  }
  "used_service_units": [
    {
      "amount": 2412739,
      "currency": null,
      "unit_of_measure": "bytes"
    }
  ]
}

Now creating a stream is easy for just simple stuff like event_type and created_at. That would be like this

CREATE STREAM tstream (event_type varchar, created_at varchar) WITH (kafka_topic='usage_events', value_format='json');

But now I need to access the used_service_units.... and I would like to extract the "amount" in the JSON above

How would I do this ?

CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units[0].amount int) WITH (kafka_topic='usage_events', value_format='json');

Results in

line 1:78: mismatched input '[' expecting {'ADD', 'APPROXIMATE', ...

And if I instead create a stream like so

CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units varchar) WITH (kafka_topic='usage_events', value_format='json');

And then does a SQL SELECT on the stream like this

SELECT EXTRACTJSONFIELD(used_service_units,'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;
SELECT EXTRACTJSONFIELD(used_service_units,'$[0].amount') FROM usage;

Neither of these alternatives work...

This one gave me

SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage;'

Code generation failed for SelectValueMapper
like image 202
Tobias Eriksson Avatar asked May 18 '18 10:05

Tobias Eriksson


People also ask

Is KSQL part of Kafka?

Though KSQL is just a Kafka client for executing data processing queries in Kafka servers, it needs a dedicated machine or instances to run just like Kafka and Zookeeper instances.

What are KSQL streams?

KSQL is the streaming SQL engine for Apache Kafka®. With KSQL, you can write real-time streaming applications by using a SQL-like query language. Kafka Streams is the Apache Kafka® library for writing streaming applications and microservices in Java and Scala.

How do I connect Kafka to KSQL?

CREATE CONNECTOR Create a new Connector in the Kafka Connect cluster with the configuration passed in the WITH clause. Note that some connectors have KSQL templates that simplify the configuration - for more information see Natively Supported Connectors. CREATE CONNECTOR works only in interactive mode.

Is KSQL a database?

ksqlDB can be described as a real-time event-streaming database built on top of Apache Kafka and Kafka Streams. It combines powerful stream processing with a relational database model using SQL syntax.


1 Answers

It seems that ONE solution to this problem is to make the column datatype an array i.e.

CREATE STREAM usage (event_type varchar,created_at varchar, total_charged_amount varchar, used_service_units array<varchar> ) WITH (kafka_topic='usage_events', value_format='json');

Now I am able to do the following:

SELECT EXTRACTJSONFIELD(used_service_units[0],'$.amount') FROM usage
like image 188
Tobias Eriksson Avatar answered Sep 22 '22 15:09

Tobias Eriksson