I'm building a topology and want to use KStream.process() to write some intermediate values to a database. This step doesn't change the nature of the data and is completely stateless.
Adding a Processor requires to create a ProcessorSupplier and pass this instance to the KStream.process()
function along with the name of a state store. This is what I don't understand.
How to add a StateStore object to a topology since it requires a StateStoreSupplier?
Failing to add a said StateStore
gives this error when the application is started:
Exception in thread "main" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore my-state-store is not added yet.
Why is it necessary for a processor to have a state store? It seems that this could well be optional for processors that are stateless and don't maintain state.
Process all elements in this stream, one element at a time, by applying a Processor.
Develop Applications using MicroservicesContainers are best at running stateless workloads.
Peek. // Performs a stateless action on each record, and returns an unchanged stream.
You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.
The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is recommended for most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.
Here's a simple example on how to use state stores, taken from the Confluent Platform documentation on Kafka Streams.
Step 1: Defining the StateStore
/StateStoreSupplier
:
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
- I don't see a way to add a StateStore object to my topology. It requires a StateStoreSupplier as well though.
Step 2: Adding the state store to your topology.
Option A - When using the Processor API:
TopologyBuilder builder = new TopologyBuilder();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
.addProcessor("Process", () -> new WordCountProcessor(), "Source")
// Add the countStore associated with the WordCountProcessor processor
.addStateStore(countStore, "Process")
.addSink("Sink", "sink-topic", "Process");
Option B - When using the Kafka Streams DSL:
Here you need to call KStreamBuilder#addStateStore("name-of-your-store")
to add the state store to your processor topology. Then, when calling methods such as KStream#process()
or KStream#transform()
, you must also pass in the name of the state store -- otherwise your application will fail at runtime.
At the example of KStream#transform()
:
KStreamBuilder builder = new KStreamBuilder();
// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);
KStream<byte[], String> input = builder.stream("source-topic");
KStream<String, Long> transformed =
input.transform(/* your TransformerSupplier */, countStore.name());
Why is it necessary for a processor to have a state store? It seems that this could well be optional for processors that are stateless and don't maintain state.
You are right -- you don't need a state store if your processor does not maintain state.
When using the DSL, you need to simply call KStreamBuilder#addStateStore("name-of-your-store")
to add the state store to your processor topology and reference it later on.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With