I need to create a state store with String key HashMap as value. I tried the below two methods.
// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
.withKeys(Serdes.String())
.withValues(HashMap.class)
.persistent()
.build();
// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();
StateStoreSupplier avgStore1 = Stores.create("Avgs")
.withKeys(Serdes.String())
.withValues(Serdes.serdeFrom(h.getClass()))
.persistent()
.build();
The code compiles fine without any error, but I get a runtime error
io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer
Can someone suggest me what is the correct way to create a state store?
State. Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, which is an important capability when implementing stateful operations.
KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an independent entity/event in the real world. For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2> in the stream.
A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.
KStream, KTable and GlobalKTable. Kafka Streams provides two abstractions for Streams and Tables. KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update.
If you want to create a state store, you need to provide a serializer and deserializer class for the type you want to use. In Kafka Stream, there is a single abstraction called Serde that wraps serializer and deserializer in a single class.
If you use .withValues(Class<K> keyClass)
it must hold that
@param keyClass the class for the keys, which must be one of the types for which Kafka has built-in serdes
Because there is no built-in Serdes
for HashMap
you need to implement one first (maybe called HashMapSerde
) and give this class to the method .withValues(Serde<K> keySerde)
. Furhtermore, you must implement the actual serializer and deserializer for HashMap
, too. If you know the generic types of your HashMap, you should specify them (what make the implementation of serializer and deserializer much simpler.
Something like this (just a sketch; generic types omitted):
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
public class HashMapSerde implements Serde<HashMap> {
void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
void close() {
/* put your code here */
}
Serializer<HashMap> serializer() {
return new Serializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public byte[] serialize(String topic, T data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
Deserializer<HashMap> deserializer() {
return new Deserializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public T deserialize(String topic, byte[] data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
}
If you want to see examples for how to implement (de)serializers and Serde
, have a look into https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/serialization and https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With