I am attempting to use <KStream>.process()
with a TimeWindows.of("name", 30000)
to batch up some KTable values and send them on. It seems that 30 seconds exceeds the consumer timeout interval after which Kafka considers said consumer to be defunct and releases the partition.
I've tried upping the frequency of poll and commit interval to avoid this:
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");
Unfortunately these errors are still occurring:
(lots of these)
ERROR o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0
Followed by these:
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
Clearly I need to be sending heartbeats back to the server more often. How?
My topology is:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String> kt = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of("write_aggregate2", 30000));
DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();
kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
The KTable is grouping values by key every 30 seconds. In Processor.init() I call context.schedule(30000)
.
DBProcessorSupplier provides an instance of DBProcessor. This is an implementation of AbstractProcessor where all the overrides have been provided. All they do is LOG so I know when each is being hit.
It's a pretty simple topology but it's clear I'm missing a step somewhere.
I get that I can adjust this on the server side but Im hoping there is a client-side solution. I like the notion of partitions being made available pretty quickly when a client exits / dies.
In an attempt to simplify the problem I removed the aggregation step from the graph. It's now just consumer->processor(). (If I send the consumer directly to .print()
it works v quickly so I know it's ok). (Similarly If I output the aggregation (KTable) via .print()
it seems ok too).
What I found was that the .process()
- which should be calling .punctuate()
every 30 seconds is actually blocking for variable lengths of time and outputting somewhat randomly (if at all).
I set the debug level to 'debug' and reran. Im seeing lots of messages:
DEBUG o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>
but a breakpoint in the .punctuate()
function isn't getting hit. So it's doing lots of work but not giving me a chance to use it.
Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management and real-time querying of application state.
The default timeout is 1 minute, to change it, open the Kafka Client Configuration > Producer tab > Advance Properties > add max.block.ms and set to desired value (in milliseconds).
The consumer session timeout is a crucial configuration for group stability. When there is a member failure, the group must pause in order to rebalance partitions. If the failure is spurious, then we often get two rebalances: one for the member failure and one for the failed member when it rejoins.
session.timeout.msThe timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker.
A few clarifications:
StreamsConfig.COMMIT_INTERVAL_MS_CONFIG
is a lower bound on the commit interval, ie, after a commit, the next commit happens not before this time passed. Basically, Kafka Stream tries to commit ASAP after this time passed, but there is no guarantee whatsoever how long it will actually take to do the next commit.StreamsConfig.POLL_MS_CONFIG
is used for the internal KafkaConsumer#poll()
call, to specify the maximum blocking time of the poll()
call.Thus, both values are not helpful to heartbeat more often.
Kafka Streams follows a "depth-first" strategy when processing record. This means, that after a poll()
for each record all operators of the topology are executed. Let's assume you have three consecutive maps, than all three maps will be called for the first record, before the next/second record will get processed.
Thus, the next poll()
call will be made, after all record of the first poll()
got fully processed. If you want to heartbeat more often, you need to make sure, that a single poll()
call fetches less records, such that processing all records takes less time and the next poll()
will be triggered earlier.
You can use configuration parameters for KafkaConsumer
that you can specify via StreamsConfig
to get this done (see https://kafka.apache.org/documentation.html#consumerconfigs):
streamConfig.put(ConsumerConfig.XXX, VALUE);
max.poll.records
: if you decrease this value, less record will be polledsession.timeout.ms
: if you increase this value, there is more time for processing data (adding this for completeness because it is actually a client setting and not a server/broker side configuration -- even if you are aware of this solution and do not like it :))EDIT
As of Kafka
0.10.1
it is possible (and recommended) to prefix consumer and procuder configs within streams config. This avoids parameter conflicts as some parameter names are used for consumer and producer and cannot be distinguiesh otherwise (and would be applied to consumer and producer at the same time). To prefix a parameter you can useStreamsConfig#consumerPrefix()
orStreamsConfig#producerPrefix()
, respectively. For example:streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);
One more thing to add: The scenario described in this question is a known issue and there is already KIP-62 that introduces a background thread for KafkaConsumer
that send heartbeats, thus decoupling heartbeats from poll()
calls. Kafka Streams will leverage this new feature in upcoming releases.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With