Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams - The state store may have migrated to another instance

I'm writing a basic application to test the Interactive Queries feature of Kafka Streams. Here is the code:

public static void main(String[] args) {
    StreamsBuilder builder = new StreamsBuilder();

    KeyValueBytesStoreSupplier waypointsStoreSupplier = Stores.persistentKeyValueStore("test-store");
    StoreBuilder waypointsStoreBuilder = Stores.keyValueStoreBuilder(waypointsStoreSupplier, Serdes.ByteArray(), Serdes.Integer());

    final KStream<byte[], byte[]> waypointsStream = builder.stream("sample1");

    final KStream<byte[], TruckDriverWaypoint> waypointsDeserialized =  waypointsStream
                                                                        .mapValues(CustomSerdes::deserializeTruckDriverWaypoint)
                                                                        .filter((k,v) -> v.isPresent())
                                                                        .mapValues(Optional::get);

    waypointsDeserialized.groupByKey().aggregate(
            () -> 1,
            (aggKey, newWaypoint, aggValue) -> {

                aggValue = aggValue + 1;
                return aggValue;

            }, Materialized.<byte[], Integer, KeyValueStore<Bytes, byte[]>>as("test-store").withKeySerde(Serdes.ByteArray()).withValueSerde(Serdes.Integer())
    );

    final KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(createStreamsProperties()));

    streams.cleanUp();
    streams.start();    

    ReadOnlyKeyValueStore<byte[], Integer> keyValueStore = streams.store("test-store", QueryableStoreTypes.keyValueStore());

    KeyValueIterator<byte[], Integer> range = keyValueStore.all();
    while (range.hasNext()) {
        KeyValue<byte[], Integer> next = range.next();
        System.out.println(next.value);

    }

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

}


protected static Properties createStreamsProperties() {

    final Properties streamsConfiguration = new Properties();

    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "random167");
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Serdes.Integer().getClass().getName());
    //streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);

    return streamsConfiguration;
}

So my problem is, every time I run this I get this same error:

Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, test-store, may have migrated to another instance.

I'm running only 1 instance of the application, and the topic I'm consuming from has only 1 partition.

Any idea what I'm doing wrong ?

like image 808
Anouer Hermassi Avatar asked Mar 15 '18 10:03

Anouer Hermassi


2 Answers

Looks like you have a race condition. From the kafka streams javadoc for KafkaStreams::start() it says:

Start the KafkaStreams instance by starting all its threads. This function is expected to be called only once during the life cycle of the client. Because threads are started in the background, this method does not block.

https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html

You're calling streams.store() immediately after streams.start(), but I'd wager that you're in a state where it hasn't initialized fully yet.

Since this is code appears to be just for testing, add a Thread.sleep(5000) or something in there and give it a go. (This is not a solution for production) Depending on your input rate into the topic, that'll probably give a bit of time for the store to start filling up with events so that your KeyValueIterator actually has something to process/print.

like image 130
Kyle Fransham Avatar answered Jan 01 '23 23:01

Kyle Fransham


Probably not applicable to OP but might help others:

In trying to retrieve a KTable's store, make sure the the KTable's topic exists first or you'll get this exception.

like image 20
Ghurdyl Avatar answered Jan 01 '23 22:01

Ghurdyl