Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to register a stateless processor (that seems to require a StateStore as well)?

I'm building a topology and want to use KStream.process() to write some intermediate values to a database. This step doesn't change the nature of the data and is completely stateless.

Adding a Processor requires to create a ProcessorSupplier and pass this instance to the KStream.process() function along with the name of a state store. This is what I don't understand.

How to add a StateStore object to a topology since it requires a StateStoreSupplier?

Failing to add a said StateStore gives this error when the application is started:

Exception in thread "main" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore my-state-store is not added yet.

Why is it necessary for a processor to have a state store? It seems that this could well be optional for processors that are stateless and don't maintain state.

Process all elements in this stream, one element at a time, by applying a Processor.

like image 575
ethrbunny Avatar asked Aug 22 '16 11:08

ethrbunny


People also ask

Which method is used for developing stateless streaming application?

Develop Applications using MicroservicesContainers are best at running stateless workloads.

Which of the following Kafka streams operators are stateless?

Peek. // Performs a stateless action on each record, and returns an unchanged stream.

How do you make KTable KStream?

You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.

Which API is used to create Dstream on Kafka?

The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is recommended for most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.


1 Answers

Here's a simple example on how to use state stores, taken from the Confluent Platform documentation on Kafka Streams.

Step 1: Defining the StateStore/StateStoreSupplier:

StateStoreSupplier countStore = Stores.create("Counts")
                                      .withKeys(Serdes.String())
                                      .withValues(Serdes.Long())
                                      .persistent()
                                      .build();
  1. I don't see a way to add a StateStore object to my topology. It requires a StateStoreSupplier as well though.

Step 2: Adding the state store to your topology.

Option A - When using the Processor API:

TopologyBuilder builder = new TopologyBuilder();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
       .addProcessor("Process", () -> new WordCountProcessor(), "Source")
       // Add the countStore associated with the WordCountProcessor processor
       .addStateStore(countStore, "Process")
       .addSink("Sink", "sink-topic", "Process");

Option B - When using the Kafka Streams DSL:

Here you need to call KStreamBuilder#addStateStore("name-of-your-store") to add the state store to your processor topology. Then, when calling methods such as KStream#process() or KStream#transform(), you must also pass in the name of the state store -- otherwise your application will fail at runtime.

At the example of KStream#transform():

KStreamBuilder builder = new KStreamBuilder();

// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);

KStream<byte[], String> input = builder.stream("source-topic");

KStream<String, Long> transformed =
    input.transform(/* your TransformerSupplier */, countStore.name());

Why is it necessary for a processor to have a state store? It seems that this could well be optional for processors that are stateless and don't maintain state.

You are right -- you don't need a state store if your processor does not maintain state.

When using the DSL, you need to simply call KStreamBuilder#addStateStore("name-of-your-store") to add the state store to your processor topology and reference it later on.

like image 159
Michael G. Noll Avatar answered Sep 28 '22 03:09

Michael G. Noll