Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

can i traverse the items in a KTable from an external method

i have a kafka topic and a KTable that listens to it.

i want to write a http POST request that will traverse the current items in the ktable, perform some action on them and write back to the topic

so basically i have:

private val accessTokenTable: KTable[String, String] = builder.table(token_topic_name, tokenStoreString)
    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.cleanUp()
    stream.start()

....

override def refreshTokens = {

    accessTokenTable.mapValues {
        new ValueMapper[String, String] {
            override def apply(value: String) = {
                value
            }
        }
    }.print(token_topic_name)
}

and when i try to call this method nothing is printed/written to the topic

what am i missing? is my only choice is to write the messages from the ktable to hashmap and read it from there? it misses the whole point of ktables?

like image 233
arseny Avatar asked Oct 30 '22 01:10

arseny


1 Answers

the correct solution is to use GlobalKTable to avoid "the state store may have migrated to another instance" errors as discussed here.

Since you answered your own question and apparently run into another issue in your follow-up, let me expand on what you said in your answer to help other readers of this question thread.

  • If you are using a KTable (which is partitioned = each "instance" of a KTable sees only a part of the total table data) Typically, what you need to do is guard against this exception and retry. Think: try-catch-retry.
  • If you are using a GlobalKTable, then you are side-stepping this problem because each instance of a GlobalKTable has a full copy of the entire table data.

Note: Normally, you don't make a decision between KTable vs. GlobalKTable because you want to prevent "state store may have migrated" situations, but because the two abstractions provide different semantics to your application. For example, there are many good reasons to use a KTable rather than a GlobalKTable -- and if you do, you simply need to be aware of what we just discussed here (which is covered in the docs, too, but apparently not obvious/clear enough considering that you did run into this question).

Hope this helps!

like image 139
Michael G. Noll Avatar answered Nov 15 '22 05:11

Michael G. Noll