I am using the low level processor API with state stores. Up to 0.10.0.1
it was working fine, but I have upgraded Kafka Streams and I am getting the below error. I figured out that this is due to the changelog and it is looking at the record context:
java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
! at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150)
! at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:60)
! at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:47)
! at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.put(ChangeLoggingKeyValueStore.java:66)
! at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$2.run(MeteredKeyValueStore.java:67)
@Override
public void process(String arg0, List<Data> data {
data.forEach((x) -> {
String rawKey = x.getId();
Data data = kvStore.get(rawKey);
long bytesize = data == null ? 0 : data.getVolume();
x.addVolume(bytesize);
kvStore.put(rawKey, x);
});
}
public void start() {
builder = new KStreamBuilder();
storeSupplier = Stores.create(getKVStoreName()).withKeys(getProcessorKeySerde()).withValues(getProcessorValueSerde()).persistent().build();
builder.addStateStore(storeSupplier);
stream = builder.stream(Serdes.String(), serde(),getTopicName());
processStream(stream);
streams = new KafkaStreams(builder, props);
streams.cleanUp();
streams.start();
}
@Override
public void init(ProcessorContext context) {
super.init(context);
this.context = context;
this.context.schedule(timeinterval);
this.kvStore = (KeyValueStore) context.getStateStore(getKVStoreName());
}
State. Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, which is an important capability when implementing stateful operations.
You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.
Kafka's designers made this simple by relying on the Linux file system, which caches up to the limits of available memory. Inside a Kafka broker, all stream data gets instantly written onto a persistent log on the filesystem, where it is cached before writing it to disk.
KIP-801 introduces a built-in authorizer, StandardAuthorizer, that does not depend on Zookeeper. This means you can now run a secure Kafka cluster without Zookeeper!
Exceptions like this may come up when using the same instance of the Processor
across multiple streams threads or partitions.
Ensure that you are returning a new instance to the ProcessorSupplier
:
new ProcesorSupplier(() -> new Processor(...
The same applies to Transformer
and TransformerSupplier
as well.
To broadly quote the documentation:
Creating a single Processor/Transformer object and returning the same object reference in ProcesorSupplier/TransformerSupplier#get() would be a violation of the supplier pattern and leads to runtime exceptions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With