i have a kafka topic and a KTable that listens to it.
i want to write a http POST request that will traverse the current items in the ktable, perform some action on them and write back to the topic
so basically i have:
private val accessTokenTable: KTable[String, String] = builder.table(token_topic_name, tokenStoreString)
val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
stream.cleanUp()
stream.start()
....
override def refreshTokens = {
accessTokenTable.mapValues {
new ValueMapper[String, String] {
override def apply(value: String) = {
value
}
}
}.print(token_topic_name)
}
and when i try to call this method nothing is printed/written to the topic
what am i missing? is my only choice is to write the messages from the ktable to hashmap and read it from there? it misses the whole point of ktables?
the correct solution is to use GlobalKTable to avoid "the state store may have migrated to another instance" errors as discussed here.
Since you answered your own question and apparently run into another issue in your follow-up, let me expand on what you said in your answer to help other readers of this question thread.
Note: Normally, you don't make a decision between KTable vs. GlobalKTable because you want to prevent "state store may have migrated" situations, but because the two abstractions provide different semantics to your application. For example, there are many good reasons to use a KTable rather than a GlobalKTable -- and if you do, you simply need to be aware of what we just discussed here (which is covered in the docs, too, but apparently not obvious/clear enough considering that you did run into this question).
Hope this helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With