Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka 0.10 Java Client TimeoutException: Batch containing 1 record(s) expired

I have a single node, multi (3) broker Zookeeper / Kafka setup. I am using the Kafka 0.10 Java client.

I wrote following simple remote (on a different Server than Kafka) Producer (in the code I replaced my public IP address with MYIP):

Properties config = new Properties();
try {
    config.put(ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "MYIP:9092, MYIP:9093, MYIP:9094");
    config.put(ProducerConfig.ACKS_CONFIG, "all");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
    producer = new KafkaProducer<String, byte[]>(config);
    Schema.Parser parser = new Schema.Parser();
    schema = parser.parse(GATEWAY_SCHEMA);
    recordInjection = GenericAvroCodecs.toBinary(schema);
    GenericData.Record avroRecord = new GenericData.Record(schema);
    //Filling in avroRecord (code not here)
    byte[] bytes = recordInjection.apply(avroRecord);

    Future<RecordMetadata> future = producer.send(new ProducerRecord<String, byte[]>(datasetId+"", "testKey", bytes));
    RecordMetadata data = future.get();
} catch (Exception e) {
    e.printStackTrace();
}

My server properties for the 3 brokers look like this (in the 3 different server properties files broker.id is 0, 1, 2 and listeners is PLAINTEXT://:9092, PLAINTEXT://:9093, PLAINTEXT://:9094 and host.name is 10.2.0.4, 10.2.0.5, 10.2.0.6). This is the first server properties file:

broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka1-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

When I execute the code, I get following exception:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
    at com.nr.roles.gateway.GatewayManager.addTransaction(GatewayManager.java:212)
    at com.nr.roles.gateway.gw.service(gw.java:126)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:821)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1158)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1090)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:109)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:119)
    at org.eclipse.jetty.server.Server.handle(Server.java:517)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:308)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:242)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:261)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95)
    at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:75)
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213)
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
    at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for 100101-0

Does anyone know what I am missing? Any help would be appreciated. Thanks a lot

like image 249
Armen Avatar asked Jul 08 '16 06:07

Armen


2 Answers

I encounter the same problems.

You should change your kafka server.properties to specify ip address. eg:

PLAINTEXT://YOUIP:9093

if not, kafka will use hostname, if the producer can not get the host, it can not send message to kafka even if you can telnet them.

like image 139
vr3C Avatar answered Nov 15 '22 00:11

vr3C


Port information in your BOOTSTRAP_SERVERS_CONFIG configuration is incorrect (MYIP:9092).

As you've mentioned in server.properties as "PLAINTEXT://:9093, PLAINTEXT://:9093, PLAINTEXT://:9094".

like image 25
Kamal Chandraprakash Avatar answered Nov 14 '22 22:11

Kamal Chandraprakash