Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a state store with HashMap as value in Kafka streams?

I need to create a state store with String key HashMap as value. I tried the below two methods.

// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
          .withKeys(Serdes.String())
          .withValues(HashMap.class)
          .persistent()
          .build();

// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();

StateStoreSupplier avgStore1 = Stores.create("Avgs")
          .withKeys(Serdes.String())
          .withValues(Serdes.serdeFrom(h.getClass()))
          .persistent()
          .build();

The code compiles fine without any error, but I get a runtime error

io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer

Can someone suggest me what is the correct way to create a state store?

like image 812
Samy Avatar asked Aug 29 '16 05:08

Samy


People also ask

What is state store in Kafka streams?

State. Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, which is an important capability when implementing stateful operations.

What is KStream in Kafka?

KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an independent entity/event in the real world. For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2> in the stream.

Is the primary abstraction in Kafka streams?

A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.

What is KStream and KTable in Kafka?

KStream, KTable and GlobalKTable. Kafka Streams provides two abstractions for Streams and Tables. KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update.


Video Answer


1 Answers

If you want to create a state store, you need to provide a serializer and deserializer class for the type you want to use. In Kafka Stream, there is a single abstraction called Serde that wraps serializer and deserializer in a single class.

If you use .withValues(Class<K> keyClass) it must hold that

@param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes

Because there is no built-in Serdes for HashMap you need to implement one first (maybe called HashMapSerde) and give this class to the method .withValues(Serde<K> keySerde). Furhtermore, you must implement the actual serializer and deserializer for HashMap, too. If you know the generic types of your HashMap, you should specify them (what make the implementation of serializer and deserializer much simpler.

Something like this (just a sketch; generic types omitted):

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;

public class HashMapSerde implements Serde<HashMap> {

    void configure(Map<String, ?> configs, boolean isKey) {
        /* put your code here */
    }

    void close() {
        /* put your code here */
    }

    Serializer<HashMap> serializer() {
        return new Serializer<HashMap>() {
            public void configure(Map<String, ?> configs, boolean isKey) {
                /* put your code here */
            }

            public byte[] serialize(String topic, T data) {
                /* put your code here */
            }

            public void close() {
                /* put your code here */
            }
        };
    }

    Deserializer<HashMap> deserializer() {
        return new Deserializer<HashMap>() {
            public void configure(Map<String, ?> configs, boolean isKey) {
                /* put your code here */
            }

            public T deserialize(String topic, byte[] data) {
                /* put your code here */
            }

            public void close() {
                /* put your code here */
            }
        };
    }
}

If you want to see examples for how to implement (de)serializers and Serde, have a look into https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization and https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java

like image 168
Matthias J. Sax Avatar answered Nov 15 '22 11:11

Matthias J. Sax