I am using kafka_2.11-2.1.1 and Producer using spring 2.1.0.RELEASE.
I am using spring while I am sending the messages to Kafka topic my producer generates a lot of TimeoutExceptions
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND--19: 229 ms has passed since batch creation plus linger time
I am using below kafka producer settings
acks: 1
retries: 1
batchSize: 100
lingerMs: 5
bufferMemory: 33554432
requestTimeoutMs: 60
I tried many combinations (specially batchSize
& lingerMs
) but nothing worked. Any help please what should be the setting for above scenario.
Tried again with below configs ...but no luck same error
acks = 1
batch.size = 15
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class com.spgmi.ca.prescore.partition.CompanyInfoPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120
retries = 1
Second Time Run :
I treid different combinations nothing worked. Hence i thought it would be problem with network , SSL etc. So I installed and run the Kafka on the same machine where producer is running i.e. in my local computer.
I tried to run the producer again pointing to the local Kafka topics. But no luck same issue.
Below are configuration params used.
2019-07-02 05:55:36.663 INFO 9224 --- [lt-dispatcher-2] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 0
bootstrap.servers = [localhost:9092]
request.timeout.ms = 60
retries = 1
buffer.memory = 33554432
linger.ms = 0
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
Facing same error : org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for inbound_topic--1: 69 ms has passed since batch creation plus linger time
Also tried batch.size 5 , 10 & 0 linger_ms 0 , 5 , 10 etc. request_time_out 0 , 45, 60, 120 , 300 etc.
Nothing working ...same error.
What else should I try , what could be the solution ?
How to avoid negative key generation
Yeah I set up local set up and print the log with partition info which shows as below
2019-07-03 02:48:28.822 INFO 7092 --- [lt-dispatcher-2] c.s.c.p.p.CompanyInfoPartitioner : Topic : inbound_topic Key = 597736248- Entropy Cayman Solar Ltd.-null-null-null Partition = -1 2019-07-03 02:48:28.931 ERROR 7092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='597736248- Entropy Cayman Solar Ltd.-null-null-null' and payload='com.spgmi.ca.prescore.model.Company@8b12343' to topic inbound_topic :
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for inbound_topic --1: 104 ms has passed since batch creation plus linger time
My topics inbound_topic has two partitions as you see below C:\Software\kafka\kafka_2.11-2.1.1\bin\windows>kafka-topics.bat --describe --zookeeper localhost:2181 --topic inbound_topic Topic:inbound_topic PartitionCount:2 ReplicationFactor:1 Configs: Topic: inbound_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: inbound_topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
But my producer seems to trying to send to Partition = -1.
My partition logic is as below
int p = (((String)key).hashCode() * Integer.MAX_VALUE) % numPartitions;
logger.info("Topic : "+ topic + "\t Key = " + (String)key + " Partition = " + p );
On key i am doing hashCode(). What need to be corrected here to avoid this negative number partition number ? i.e. Partition = -1
What should be my partition key logic like ?
anyhelp highly appriciated.
size measures batch size in total bytes instead of the number of messages. It controls how many bytes of data to collect before sending messages to the Kafka broker. Set this as high as possible, without exceeding available memory. The default value is 16384.
linger.ms refers to the time to wait before sending messages out to Kafka. It defaults to 0, which the system interprets as 'send messages as soon as they are ready to be sent'. batch. size refers to the maximum amount of data to be collected before sending the batch.
max.block.ms is used for producer to block buffer time, serialization time etc.
The error indicates that some records are put into the queue at a faster rate than they can be sent from the client.
When your Producer sends messages, they are stored in buffer (before sending them to the target broker) and the records are grouped together into batches in order to increase throughput. When a new record is added to the batch, it must be sent within a -configurable- time window which is controlled by request.timeout.ms
(the default is set to 30 seconds). If the batch is in the queue for longer time, a TimeoutException
is thrown and the batch records will then be removed from the queue and won't be delivered to the broker.
Increasing the value of request.timeout.ms
should do the trick for you.
In case this does not work, you can also try decreasing batch.size
so that batches are sent more often (but this time will include fewer messages) and make sure that linger.ms
is set to 0 (which is the default value).
Note that you need to restart your kafka brokers after changing any configuration parameter.
If you still get the error I assume that something wrong is going on with your network. Have you enabled SSL?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With