Is there a way to split a message into multiple messages using KSQL and publish to a new topic. Just to be clear, I am not looking for a Java based listener and iterate/stream it to a new topic; instead, I am looking for a KSQL that does that for me.
For example:
Let's say, I need messages in invoice topic split into item_inventory_delta messages
key: saleschecknumber
message example:
{
    "total": 12.33,
    "salecounter": 1,
    "items": [
        {
            "itemId": 123,
            "quantity": 1
        },
        {
            "itemId": 345,
            "quantity": 5
        }
    ]
}
key: saleschecknumber_itemID
message examples
{
    "itemId": 123,
    "quantity": 1
}
2.
{
    "itemId": 345,
    "quantity": 5
}
                As of ksqlDB 0.6 you can now do this, thanks to the addition of the EXPLODE table function. 
Given a topic invoice with JSON payload per your example, first inspect the topic using PRINT to dump its contents: 
ksql> PRINT invoice FROM BEGINNING;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","total":12.33,"salecounter":1,"items":[{"itemId":123,"quantity":1},{"itemId":345,"quantity":5}]}
Then declare a schema on topic of the topic, which gives us a ksqlDB stream:
CREATE STREAM INVOICE (total DOUBLE, 
                       salecounter INT, 
                       items ARRAY<STRUCT<itemId INT, 
                                          quantity INT>>) 
                WITH (KAFKA_TOPIC='invoice', 
                      VALUE_FORMAT='JSON');
This simply "registers" the existing topic for use with ksqlDB. No new Kafka topics are written, until the next step.
Create a new Kafka topic, populated continually from the messages arriving in the source stream:
CREATE STREAM INVENTORY WITH (KAFKA_TOPIC='item_inventory_delta') AS 
  SELECT EXPLODE(ITEMS)->ITEMID AS ITEMID, 
         EXPLODE(ITEMS)->QUANTITY AS QUANTITY 
    FROM INVOICE;
New topic has been created:
ksql> SHOW TOPICS;
 Kafka Topic                     | Partitions | Partition Replicas
-------------------------------------------------------------------
 invoice                         | 1          | 1
 item_inventory_delta            | 1          | 1
Topic has delta messages as requested :)
ksql> PRINT item_inventory_delta;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":123,"QUANTITY":1}
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":345,"QUANTITY":5}
                        There are many ways to handle for my understanding its more related to how we process incoming messages not to aggregate the message. Easy way to use Kafka Stream Processor API which allows you to customize processing logic.
Kafka Stream Processor API
The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic
Note: You have not to define what will output value so I am just posting the key and value same but it's your choice you can define your output key and value
You can define Kafka Stream processor API as below
Topology builder = new Topology();
builder.addSource("Source", "invoice")
                .addProcessor("sourceProcessor", () -> new InvoiceProcessor(), "Source")
                .addSink("sinkDeltaInvoice", "item_inventory_delta", Serdes.String().serializer(), Serdes.String().serializer(),
                        "sourceProcessor")
Below is the custom Processor approach please note its just an approach, not a full implementation
class InvoiceProcessor implements Processor<String, String> {
        private Gson gson = new Gson();
        //constructor
        .......
        private ProcessorContext context;
        @Override
        public void init(ProcessorContext context) {
            this.context = context;
        }
        @Override
        public void close() {
            // Any code for clean up would go here. This processor instance will not be used
            // again after this call.
        }
        @Override
        public void process(String key, String value) {
            try {
                //Create custom inventory to map JSON object  
                //List[Item] items is member object of Inventory class
                Inventory inventory = gson.fromJson(key, Inventory.class);
                
                
                //itertae item of items List[Items]
                for(Item item: inventory.getItems()){
                context.forward(gson.toJson(item), gson.toJson(item), To.child("sinkDeltaInvoice"));
                
                }
                //
                
                
                }
        }
    }  
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With