Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error reading field 'topic_metadata' in Kafka

I am trying to connect to my broker on aws with auto.create.topics.enable=true in my server.properties file. But when I am trying to connect to broker using Java client producer I am getting the following error.

1197 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 619631, only 37 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73) at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) at java.lang.Thread.run(Unknown Source)

Following is my Client producer code.

public static void main(String[] argv){
         Properties props = new Properties();
         props.put("bootstrap.servers", "http://XX.XX.XX.XX:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 0);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("block.on.buffer.full",true);
         Producer<String, String> producer = new KafkaProducer<String, String>(props);
        try{ for(int i = 0; i < 10; i++)
        { producer.send(new ProducerRecord<String, String>("topicjava", Integer.toString(i), Integer.toString(i)));
             System.out.println("Tried sending:"+i);}
        }
        catch (Exception e){
            e.printStackTrace();
        }
         producer.close();
}

Can someone help me resolve this?

like image 991
user2966021 Avatar asked May 28 '16 06:05

user2966021


3 Answers

I have faced the similar issue. The problem here is, when there is a mismatch between kafka clients version in pom file and kafka server is different. I was using kafka clients 0.10.0.0_1 but the kafka server was still in 0.9.0.0. So i upgraded the kafka server version to 10 the issue got resolved.

<dependency>
            <groupId>org.apache.servicemix.bundles</groupId>
            <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
            <version>0.10.0.0_1</version>
        </dependency>            
like image 109
Sanjeev Avatar answered Nov 03 '22 13:11

Sanjeev


Looks like I was setting wrong properties at the client side also my server.properties file had properties which were not meant for the client I was using.So I decided to change the java client to version 0.9.0 using maven.

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
</dependency>

my server.properties file is as below.

broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=9000
delete.topic.enable=true
advertised.host.name=<aws public Ip>
advertised.port=9092

My producer code looks like

    import java.util.Properties;
    import java.util.concurrent.ExecutionException;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    public class HelloKafkaProducer 
     {


       public static void main(String args[]) throws InterruptedException,      ExecutionException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(props);

        boolean sync = false;
        String topic="loader1";
        String key = "mykey";
        for(int i=0;i<1000;i++)
        {
        String value = "myvaluehasbeensent"+i+i;
        ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic, key, value);
        if (sync) {
            producer.send(producerRecord).get();
        } else {
            producer.send(producerRecord);
        }
        }
        producer.close();
    }
 }
like image 31
user2966021 Avatar answered Nov 03 '22 12:11

user2966021


Make sure that you use the correct versions. Lets say you use following maven dependecy:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  <version>${flink.version}</version>
</dependency>

So the artifact equals: flink-connector-kafka-0.8_2.10

Now check if you use the correct Kafka version:

cd /KAFKA_HOME/libs

Now find kafka_YOUR-VERSION-sources.jar.

In my case I have kafka_2.10-0.8.2.1-sources.jar. So it works fine! :) If you use different versions, just change maven dependecies OR download the correct kafka version.

like image 37
lidox Avatar answered Nov 03 '22 12:11

lidox