Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Stream 0.10.2.0 state store getting exception while store the value

I am using the low level processor API with state stores. Up to 0.10.0.1 it was working fine, but I have upgraded Kafka Streams and I am getting the below error. I figured out that this is due to the changelog and it is looking at the record context:

java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
! at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150)
! at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:60)
! at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:47)
! at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.put(ChangeLoggingKeyValueStore.java:66)
! at     org.apache.kafka.streams.state.internals.MeteredKeyValueStore$2.run(MeteredKeyValueStore.java:67)

@Override
    public void process(String arg0, List<Data> data {
        data.forEach((x) -> {
            String rawKey = x.getId();
            Data data = kvStore.get(rawKey);
            long bytesize = data == null ? 0 : data.getVolume();
            x.addVolume(bytesize);
            kvStore.put(rawKey, x);
        });
    }
  
public void start() {
        builder = new KStreamBuilder();
        storeSupplier =     Stores.create(getKVStoreName()).withKeys(getProcessorKeySerde()).withValues(getProcessorValueSerde()).persistent().build();
        builder.addStateStore(storeSupplier);
        stream = builder.stream(Serdes.String(), serde(),getTopicName());
        processStream(stream);
        streams = new KafkaStreams(builder, props);
        streams.cleanUp();
        streams.start();
    }

    @Override
    public void init(ProcessorContext context) {
        super.init(context);
        this.context = context;
        this.context.schedule(timeinterval);
        this.kvStore = (KeyValueStore) context.getStateStore(getKVStoreName());
    }
like image 622
Bharateshwar Patil Avatar asked May 18 '17 09:05

Bharateshwar Patil


People also ask

What is state store in Kafka streams?

State. Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, which is an important capability when implementing stateful operations.

How do you convert KStream to KTable?

You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.

Does Kafka store data in memory?

Kafka's designers made this simple by relying on the Linux file system, which caches up to the limits of available memory. Inside a Kafka broker, all stream data gets instantly written onto a persistent log on the filesystem, where it is cached before writing it to disk.

Does Kafka 3.2 need zookeeper?

KIP-801 introduces a built-in authorizer, StandardAuthorizer, that does not depend on Zookeeper. This means you can now run a secure Kafka cluster without Zookeeper!


1 Answers

Exceptions like this may come up when using the same instance of the Processor across multiple streams threads or partitions.

Ensure that you are returning a new instance to the ProcessorSupplier:

new ProcesorSupplier(() -> new Processor(...

The same applies to Transformer and TransformerSupplier as well.

To broadly quote the documentation:

Creating a single Processor/Transformer object and returning the same object reference in ProcesorSupplier/TransformerSupplier#get() would be a violation of the supplier pattern and leads to runtime exceptions.

like image 100
amcc Avatar answered Sep 28 '22 09:09

amcc