Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams in docker-compose takes long time for partition assignment

I'm running a Kafka streaming application in a docker container. For testing purposes I have a docker-compose file with that runs the streaming application, a single instance of kafka, and zookeeper. The configuration for both kafka and zookeeper have worked before.

It takes upwards of 5 minutes for the kafka streaming application to be assigned partitions. If I delay running the stream container until kafka and zookeeper are up, and the topic that streaming application is consuming has been instantiated properly, then it gets its assigned partitions almost instantly.

It seems like the kafka stream group is being instantiated, but, the application is being assigned no partitions. This is, presumably, because the topic hasn't been fully instantiated yet. It does not get partitions assigned until the next generation, which seems to take almost exactly 5 minutes.

In my (limited) understanding of the situation, I have a few options for decreasing this delay:

  • check to see that the topic has metadata before starting the streaming application
  • decrease the interval between generations (seems like this could have problems in production, but may be fine for testing)

However, I realize I might be missing something obvious considering my limited knowledge in this area.

EDIT: docker-compose file for reference

version: 3.3
services:
    kafka-stream-ingestor:
      build:
        context: .
        dockerfile: Dockerfile
        args:
          - version

      networks:
        - services

    zookeeper:
        image: wurstmeister/zookeeper
        ports:
          - 2181:2181
        networks:
          - services

    kafka:
      image: wurstmeister/kafka:latest
      ports:
        - 9094:9094
        - 9092:9092
      environment:
        KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_KAFKA_HOST}
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
        KAFKA_ADVERTISED_PROTOCOL_NAME: OUTSIDE
        KAFKA_ADVERTISED_PORT: 9094
        KAFKA_CREATE_TOPICS: "kafka-queue:12:1"
        KAFKA_PROTOCOL_NAME: INSIDE
        KAFKA_PORT: 9092
      volumes:
        - /var/run/docker.sock:/var/run/docker.sock
      networks:
        - services

  networks:
    services:

  volumes:
    testresult:
like image 448
shaftoes Avatar asked Jan 05 '18 16:01

shaftoes


People also ask

What is grace period in Kafka Streams?

« Kafka Summit Americas 2021. The grace period is a parameter of windowed operations such as Window or Session aggregates, or stream-stream joins. This configuration determines how long after a window ends any new data will still be processed.

What is difference between Kafka and Kafka Streams?

Introduction. Apache Kafka is the most popular open-source distributed and fault-tolerant stream processing system. Kafka Consumer provides the basic functionalities to handle messages. Kafka Streams also provides real-time stream processing on top of the Kafka Consumer client.

What is KStream and KTable in Kafka?

KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update in KTable. KStreams are stateless whereas KTable is stateful.

What is KStream in Kafka?

KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an independent entity/event in the real world. For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2> in the stream.


1 Answers

I've found a temporary solution that will work under limited circumstances (only required for testing locally, or via integration tests). I will not mark this as solved to allow for better answers.

Essentially the stream app is asking for metadata before the partitions is ready. kafka says 'there are no partitions yet' and the app says 'okay there are no partitions for assignment', and then waits a (configurable) amount of time until the partition metadata has become stale. It then makes another request to kafka, which, at this point, has created the partition.

The configuration that dictates this refresh interval is kafka.metadata.max.age.ms. I set this to 1000ms.

like image 114
shaftoes Avatar answered Nov 15 '22 05:11

shaftoes