Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka streams - KSQL - Split messages and publish to another topic

Is there a way to split a message into multiple messages using KSQL and publish to a new topic. Just to be clear, I am not looking for a Java based listener and iterate/stream it to a new topic; instead, I am looking for a KSQL that does that for me.

For example:

Let's say, I need messages in invoice topic split into item_inventory_delta messages

invoice topic

key: saleschecknumber

message example:

{
    "total": 12.33,
    "salecounter": 1,
    "items": [
        {
            "itemId": 123,
            "quantity": 1
        },
        {
            "itemId": 345,
            "quantity": 5
        }
    ]
}

item_inventory_delta topic

key: saleschecknumber_itemID

message examples

1.

{
    "itemId": 123,
    "quantity": 1
}

2.

{
    "itemId": 345,
    "quantity": 5
}
like image 984
so-random-dude Avatar asked Oct 31 '25 14:10

so-random-dude


2 Answers

As of ksqlDB 0.6 you can now do this, thanks to the addition of the EXPLODE table function.

Given a topic invoice with JSON payload per your example, first inspect the topic using PRINT to dump its contents:

ksql> PRINT invoice FROM BEGINNING;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","total":12.33,"salecounter":1,"items":[{"itemId":123,"quantity":1},{"itemId":345,"quantity":5}]}

Then declare a schema on topic of the topic, which gives us a ksqlDB stream:

CREATE STREAM INVOICE (total DOUBLE, 
                       salecounter INT, 
                       items ARRAY<STRUCT<itemId INT, 
                                          quantity INT>>) 
                WITH (KAFKA_TOPIC='invoice', 
                      VALUE_FORMAT='JSON');

This simply "registers" the existing topic for use with ksqlDB. No new Kafka topics are written, until the next step.

Create a new Kafka topic, populated continually from the messages arriving in the source stream:

CREATE STREAM INVENTORY WITH (KAFKA_TOPIC='item_inventory_delta') AS 
  SELECT EXPLODE(ITEMS)->ITEMID AS ITEMID, 
         EXPLODE(ITEMS)->QUANTITY AS QUANTITY 
    FROM INVOICE;

New topic has been created:

ksql> SHOW TOPICS;

 Kafka Topic                     | Partitions | Partition Replicas
-------------------------------------------------------------------
 invoice                         | 1          | 1
 item_inventory_delta            | 1          | 1

Topic has delta messages as requested :)

ksql> PRINT item_inventory_delta;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":123,"QUANTITY":1}
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":345,"QUANTITY":5}
like image 113
Robin Moffatt Avatar answered Nov 02 '25 15:11

Robin Moffatt


There are many ways to handle for my understanding its more related to how we process incoming messages not to aggregate the message. Easy way to use Kafka Stream Processor API which allows you to customize processing logic.

Kafka Stream Processor API

The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic

Note: You have not to define what will output value so I am just posting the key and value same but it's your choice you can define your output key and value

You can define Kafka Stream processor API as below

Topology builder = new Topology();
builder.addSource("Source", "invoice")
                .addProcessor("sourceProcessor", () -> new InvoiceProcessor(), "Source")
                .addSink("sinkDeltaInvoice", "item_inventory_delta", Serdes.String().serializer(), Serdes.String().serializer(),
                        "sourceProcessor")

Below is the custom Processor approach please note its just an approach, not a full implementation

class InvoiceProcessor implements Processor<String, String> {
        private Gson gson = new Gson();

        //constructor
        .......
        private ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;

        }

        @Override
        public void close() {
            // Any code for clean up would go here. This processor instance will not be used
            // again after this call.
        }

        @Override
        public void process(String key, String value) {
            try {

                //Create custom inventory to map JSON object  
                //List[Item] items is member object of Inventory class
                Inventory inventory = gson.fromJson(key, Inventory.class);
                
                
                //itertae item of items List[Items]
                for(Item item: inventory.getItems()){
                context.forward(gson.toJson(item), gson.toJson(item), To.child("sinkDeltaInvoice"));
                
                }
                //
                
                
                }


        }

    }  
like image 38
Nitin Avatar answered Nov 02 '25 15:11

Nitin



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!