Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Newly build KTable returns nothing

I am trying to use KTable to consume events from Kafka topic. But, it returns nothing. When I use KStream, it returns and prints objects. This is really strange. Producer and Consumer can be found here

//Not working    
KTable<String, Customer> customerKTable = streamsBuilder.table("customer", Consumed.with(Serdes.String(), customerSerde),Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name()));
customerKTable.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)));

//KStream working
KStream<String, Customer> customerKStream= streamsBuilder.stream("customer", Consumed.with(Serdes.String(), customerSerde));
customerKStream.foreach(((key, value) -> System.out.println("Customer from Topic: " + value)))
like image 400
Pavan Jadda Avatar asked Oct 28 '22 13:10

Pavan Jadda


1 Answers

After a lot of research, I found the issue in my syntax. The syntax I am using is valid, based on Confluent/Kafka documentation but it's not working. Will raise a bug with Kafka team. Now, new syntax that is working is

KTable<String, Customer> customerKTable = streamsBuilder.table("customer",Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(customerStateStore.name())
                                                            .withKeySerde(Serdes.String())
                                                            .withValueSerde(customerSerde));

I should include withKeySerde() and withValueSerde() to make KTable work. But this is not mentioned Confluent/Kafka documentation

like image 78
Pavan Jadda Avatar answered Nov 08 '22 04:11

Pavan Jadda