Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error reading field 'topics': java.nio.BufferUnderflowException in Kafka

9.0 client to consume messages from two brokers which are running on a remote system.My producer is working fine and is able to send messages to the broker but my consumer is not able to consume these messages.Consumer and producer are running on my local system and the two brokers are on aws. Whenever I try to run consumer. Following error appears on the broker logs.

ERROR Closing socket for /122.172.17.81 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
        at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
        at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
        at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
        at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
        at kafka.network.Processor.read(SocketServer.scala:450)
        at kafka.network.Processor.run(SocketServer.scala:340)
        at java.lang.Thread.run(Thread.java:745)

My Consumer code is as follows>

package Kafka1.K1;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

public class HelloKafkaConsumer 
{
    public static void main(String args[]) throws InterruptedException, ExecutionException {
        String a[] = new String[]{"loader1"};
        //topik.add("loader1");
Properties props = new Properties();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP1:9092,IP2:9093");
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
 props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
 props.put("heartbeat.interval.ms", "500");
 props.put("session.timeout.ms", "1000");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "10000");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
 consumer.subscribe(Arrays.asList(a));
 while (true) {
        // Poll for ConsumerRecords for a certain amount of time
        ConsumerRecords<String, String> records = consumer.poll(1000);

        // Process the ConsumerRecords, if any, that came back
        for (ConsumerRecord<String, String> record : records) {
                String key = record.key();
                String value = record.value();
                System.out.println(key+":"+value);
                // Do something with message
        }
      }

    }
}

Can someone help?

like image 861
user2966021 Avatar asked Jun 01 '16 09:06

user2966021


1 Answers

This issue occurs when the kafka cluster running on your machine is older version i.e 0.8.x.x where as the client being used to access data from the cluster is of higher version i.e 0.9.x.x.

There are two simple solutions based on requirements:

  1. Downgrade the client version.
  2. Upgrade the kafka cluster.
like image 55
Rajat Avatar answered Nov 15 '22 07:11

Rajat