Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams 0.10.1 "Failed to flush state store"

I'm trying to create a simple aggregation example in Scala with Kafka Streams 0.10.1 although I seem to fail with a simple "count" aggregation (using the Kafka console producer). With such a code:

val inputStream: KStream[String, String] = builder.stream("inputTopic")

inputStream
  .map(new KeyValueMapper[String, String, KeyValue[String, String]] {
    override def apply(k: String, v: String): KeyValue[String, String] = {
      new KeyValue[String, String](v, v)
    }
  })
  .groupByKey()
  .count(TimeWindows.of(10000L), "count-test-1")
  .toStream()
  .to("outputTopic")

it fails with "Failed to flush state store count-test-1", I've included the full stacktrace at the end of the post. On the other hand, if I use print() instead of to() it works like a charm, printing out results to the console/terminal:

[KTABLE-TOSTREAM-0000000013]: [aa@1483089460000] , 1
[KTABLE-TOSTREAM-0000000013]: [bb@1483089460000] , 1
[KTABLE-TOSTREAM-0000000013]: [cc@1483089460000] , 2
[KTABLE-TOSTREAM-0000000013]: [dd@1483089460000] , 3
[KTABLE-TOSTREAM-0000000013]: [ee@1483089460000] , 4

Does anyone have any idea what might be the cause of such behaviour?

FYI, the OSes I use are Windows 10 as a host (also running the Scala app via IntelliJ) and an Ubuntu 16.04 VM for Kafka (in a Docker container) and producer/consumer apps. I can, however, confirm that the problem can be experienced when running the app on the Ubuntu VM as well.

Thanks a lot in advance for your help, any insight is appreciated :-)

Full stacktrace:

2016-12-30 08:57:43 INFO  StreamThread:573 - stream-thread [StreamThread-1] Committing task 2_0
2016-12-30 08:57:43 ERROR StreamThread:582 - stream-thread [StreamThread-1] Failed to commit StreamTask 2_0 state:
org.apache.kafka.streams.errors.ProcessorStateException: task [2_0] Failed to flush state store count-test-1
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:275)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:576)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:562)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:538)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:456)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
        at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
        at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:72)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
        at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
        at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329)
        ... 6 more
2016-12-30 08:57:43 INFO  StreamThread:268 - stream-thread [StreamThread-1] Shutting down
2016-12-30 08:57:43 INFO  StreamThread:358 - stream-thread [StreamThread-1] Committing consumer offsets of task 0_0
2016-12-30 08:57:43 INFO  StreamThread:358 - stream-thread [StreamThread-1] Committing consumer offsets of task 1_0
2016-12-30 08:57:43 INFO  StreamThread:358 - stream-thread [StreamThread-1] Committing consumer offsets of task 2_0
2016-12-30 08:57:43 INFO  StreamThread:751 - stream-thread [StreamThread-1] Closing a task 0_0
2016-12-30 08:57:43 INFO  StreamThread:751 - stream-thread [StreamThread-1] Closing a task 1_0
2016-12-30 08:57:43 INFO  StreamThread:751 - stream-thread [StreamThread-1] Closing a task 2_0
2016-12-30 08:57:43 INFO  StreamThread:368 - stream-thread [StreamThread-1] Flushing state stores of task 0_0
2016-12-30 08:57:43 INFO  StreamThread:368 - stream-thread [StreamThread-1] Flushing state stores of task 1_0
2016-12-30 08:57:43 INFO  StreamThread:368 - stream-thread [StreamThread-1] Flushing state stores of task 2_0
2016-12-30 08:57:43 ERROR StreamThread:330 - stream-thread [StreamThread-1] Failed while executing StreamTask 2_0 duet to flush state:
org.apache.kafka.streams.errors.ProcessorStateException: task [2_0] Failed to flush state store count-test-1
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331)
        at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:180)
        at org.apache.kafka.streams.processor.internals.StreamThread$4.apply(StreamThread.java:369)
        at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328)
        at org.apache.kafka.streams.processor.internals.StreamThread.flushAllState(StreamThread.java:365)
        at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:301)
        at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:269)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:252)
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
        at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
        at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:72)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
        at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
        at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329)
        ... 7 more
2016-12-30 08:57:43 INFO  StreamThread:347 - stream-thread [StreamThread-1] Closing the state manager of task 0_0
2016-12-30 08:57:43 INFO  StreamThread:347 - stream-thread [StreamThread-1] Closing the state manager of task 1_0
2016-12-30 08:57:43 INFO  StreamThread:347 - stream-thread [StreamThread-1] Closing the state manager of task 2_0
2016-12-30 08:57:43 ERROR StreamThread:330 - stream-thread [StreamThread-1] Failed while executing StreamTask 2_0 duet to close state manager:
org.apache.kafka.streams.errors.ProcessorStateException: task [2_0] Failed to close state store count-test-1
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:351)
        at org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:120)
        at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:348)
        at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328)
        at org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:344)
        at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:305)
        at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:269)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:252)
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
        at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
        at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:72)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
        at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
        at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:124)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:349)
        ... 7 more
2016-12-30 08:57:43 INFO  KafkaProducer:685 - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2016-12-30 08:57:43 INFO  StreamThread:725 - stream-thread [StreamThread-1] Removing all active tasks [[0_0, 1_0, 2_0]]
2016-12-30 08:57:43 INFO  StreamThread:740 - stream-thread [StreamThread-1] Removing all standby tasks [[]]
2016-12-30 08:57:43 INFO  StreamThread:292 - stream-thread [StreamThread-1] Stream thread shutdown complete
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [2_0] Failed to flush state store count-test-1
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:331)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:275)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:576)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:562)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:538)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:456)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
        at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
        at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:72)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
        at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
        at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
        at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
        at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:329)
        ... 6 more
2016-12-30 08:57:43 INFO  KafkaStreams:237 - Stopped Kafka Stream process
like image 413
hun7er Avatar asked Dec 30 '16 09:12

hun7er


1 Answers

The result type of count(...) is not <String,Long> but <Windowed<String>,Long> because you use a windowed aggregation. Thus, your default key de/serializer that is for type String fails:

Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String

You either need to specify a different key de/serializer in to(...) or you need to put an additional map() after toStream() to convert your key type from Windowed<String> to String.

It works if you use print() as no serialization happens in contrast to writing the result to a Kafka topic.

like image 151
Matthias J. Sax Avatar answered Sep 18 '22 22:09

Matthias J. Sax