Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Getting "java.lang.IllegalStateException: Tried to lookup lag for unknown task 3_0" after upgrading Kafka Stream from 2.5.1 to 2.6.2

I just upgraded our Kafka Stream application from 2.5.1 to 2.6.2. It used to work, now it doesn't.

Here is the troublesome topology (I have omitted the irrelevant Serdes):

val builder = new StreamsBuilder()

val contractEventStream: KStream[TariffId, ContractEvent] =
   builder.stream[String, ContractUpsertAvro](settings.contractsTopicName)
     .flatMap { (_, contractAvro) =>
       ContractEvent.from(contractAvro)
         .map(contractEvent => (contractEvent.tariffId, contractEvent))
      }

    val tariffsTable: KTable[TariffId, Tariff] =
      builder.stream[String, TariffUpdateEventAvro](settings.tariffTopicName)
        .flatMapValues(Tariff.fromAvro(_))
        .selectKey((_, tariff) => tariff.tariffId)
        .toTable(Materialized.`with`(tariffIdSerde, tariffSerde)) // Materialized.as also throws the same IllegalStateExceptions

    contractEventStream
      .join(tariffsTable)(JourneyStep.from(_, _).asInstanceOf[ContractCreated])(Joined.`with`(tariffIdSerde, contractEventSerde, tariffSerde))
      .selectKey((_, contractUpdated) => contractUpdated.accountId)
      .foreach((_, journeyStep) => println(journeyStep))

The join gives the following exception:

java.lang.IllegalStateException: Tried to lookup lag for unknown task 3_0
    at org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
    at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511)
    at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216)
    at java.util.TreeMap.compare(TreeMap.java:1295)
    at java.util.TreeMap.put(TreeMap.java:538)
    at java.util.TreeSet.add(TreeSet.java:255)
    at java.util.AbstractCollection.addAll(AbstractCollection.java:344)
    at java.util.TreeSet.addAll(TreeSet.java:312)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1275)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1189)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:940)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:399)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:684)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:597)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:560)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1160)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1135)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

I can't see what I am doing wrong. The code above works with Kafka 2.5.1. Anyone has any idea what is going on?

like image 508
tomvogel01 Avatar asked May 15 '21 11:05

tomvogel01


1 Answers

The problem is caused by the Kafka Streams cache, which it keeps on disk. This cache is specific to Kafka-version and to the Kafka Streams topology you use (ie. a change in your topology could also lead to this error).

The cache is usually found in /tmp or elsewhere if you passed in the "state.dir" property to Kafka Streams. Clear the directory with the cache and you should be able to cleanly start again.

like image 96
Jeroen Avatar answered Oct 04 '22 12:10

Jeroen