We have a KStreams application with following configuration:
props.setProperty(RETRIES_CONFIG, String.valueOf(Integer.MAX_VALUE));
props.setProperty(RETRY_BACKOFF_MS_CONFIG, "5000"); // 5 seconds
props.setProperty(RECONNECT_BACKOFF_MS_CONFIG, "5000"); // 5 seconds
props.setProperty(REQUEST_TIMEOUT_MS_CONFIG, "5000"); // 5 seconds
props.setProperty(SESSION_TIMEOUT_MS_CONFIG, "25000"); // 25 seconds session timeout
props.setProperty(MAX_POLL_RECORDS_CONFIG, "100"); // 100 records per poll
props.setProperty(MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(Integer.MAX_VALUE));
// do not add any more time to window retention period, delete immidiately
props.setProperty(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, "0");
Even with very large MAX_POLL_INTERVAL_MS_CONFIG
, we see errors like (formatted exceptions in json):
{
"@timestamp": "2020-02-07T15:30:19.631Z",
"message": "[Consumer clientId=client-03a38ada-b39c-497a-acd4-aa95066fdc8a-StreamThread-6-consumer, groupId=group-name] Offset commit failed on partition group-name-repartition-3 at offset 9066: The coordinator is not aware of this member.",
"logger_name": "org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"level": "ERROR"
}
What else do we need to configure? Is there any other parameter involved? I have to mention that Kafka broker is managed service and we don't configure server side configuration parameters. Additionally commit interval is set to 10 seconds. Everything else is default for KStreams 2.4.0.
Another reason of this problem is not sending heartbeat in session.timeout.ms
. So maybe you can consider to increase this.
heartbeat.interval.ms: The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.
session.timeout.ms: The timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With