Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Client Timeout of 60000ms expired before the position for partition could be determined

I'm trying to connect Flink to a Kafka consumer

I'm using Docker Compose to build 4 containers zookeeper, kafka, Flink JobManager and Flink TaskManager.

For zookeeper and Kafka I'm using wurstmeister images, and for Flink I'm using the official image.

docker-compose.yml

version: '3.1'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    hostname: zookeeper
    expose:
      - "2181"
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    hostname: kafka
    links:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_CREATE_TOPICS: 'pipeline:1:1:compact'

  jobmanager:
    build: ./flink_pipeline
    depends_on:
      - kafka
    links:
      - zookeeper
      - kafka
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      JOB_MANAGER_RPC_ADDRESS: jobmanager
      BOOTSTRAP_SERVER: kafka:9092
      ZOOKEEPER: zookeeper:2181

  taskmanager:
    image: flink
    expose:
      - "6121"
      - "6122"
    links:
      - jobmanager
      - zookeeper
      - kafka
    depends_on:
      - jobmanager
    command: taskmanager
    # links:
    #   - "jobmanager:jobmanager"
    environment:
      JOB_MANAGER_RPC_ADDRESS: jobmanager

And When I submit a simple job to Dispatcher the Job fails with the following error:

org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition pipeline-0 could be determined

My Job code is:

public class Main {
    public static void main( String[] args ) throws Exception
    {
        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        Properties properties = new Properties();
        String bootstrapServer = System.getenv("BOOTSTRAP_SERVER");
        String zookeeperServer = System.getenv("ZOOKEEPER");

        if (bootstrapServer == null) {
            System.exit(1);
        }

        properties.setProperty("zookeeper", zookeeperServer);
        properties.setProperty("bootstrap.servers", bootstrapServer);
        properties.setProperty("group.id", "pipeline-analysis");

        FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer<String>("pipeline", new SimpleStringSchema(), properties);
        // kafkaConsumer.setStartFromGroupOffsets();
        kafkaConsumer.setStartFromLatest();

        DataStream<String> stream = env.addSource(kafkaConsumer);

        // Defining Pipeline here

        // Printing Outputs
        stream.print();

        env.execute("Stream Pipeline");
    }
}
like image 237
Mahmoud Sultan Avatar asked Feb 24 '19 15:02

Mahmoud Sultan


1 Answers

I know I'm late to the party but I had the exact same error. In my case, I was not setting up TopicPartitions correctly. My topic had 2 partitions and my producer was producing messages just fine, but it's the spark streaming application, as my consumer, that wasn't really starting and giving up after 60 secs complaining the same error.

Wrong code that I had -

List<TopicPartition> topicPartitionList = Arrays.asList(new topicPartition(topicName, Integer.parseInt(numPartition)));

Correct code -

List<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();

for (int i = 0; i < Integer.parseInt(numPartitions); i++) {
    topicPartitionList.add(new TopicPartition(topicName, i));
}
like image 110
Sanjeev Dhiman Avatar answered Oct 02 '22 07:10

Sanjeev Dhiman