Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka KStreams - processing timeouts

I am attempting to use <KStream>.process() with a TimeWindows.of("name", 30000) to batch up some KTable values and send them on. It seems that 30 seconds exceeds the consumer timeout interval after which Kafka considers said consumer to be defunct and releases the partition.

I've tried upping the frequency of poll and commit interval to avoid this:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");

Unfortunately these errors are still occurring:

(lots of these)

ERROR  o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog 
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0

Followed by these:

INFO   o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN   o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: 
  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)

Clearly I need to be sending heartbeats back to the server more often. How?

My topology is:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String>  kt = lines.aggregateByKey(
            new DBAggregateInit(),
            new DBAggregate(),
            TimeWindows.of("write_aggregate2", 30000));

DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();

kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

The KTable is grouping values by key every 30 seconds. In Processor.init() I call context.schedule(30000).

DBProcessorSupplier provides an instance of DBProcessor. This is an implementation of AbstractProcessor where all the overrides have been provided. All they do is LOG so I know when each is being hit.

It's a pretty simple topology but it's clear I'm missing a step somewhere.


Edit:

I get that I can adjust this on the server side but Im hoping there is a client-side solution. I like the notion of partitions being made available pretty quickly when a client exits / dies.


Edit:

In an attempt to simplify the problem I removed the aggregation step from the graph. It's now just consumer->processor(). (If I send the consumer directly to .print() it works v quickly so I know it's ok). (Similarly If I output the aggregation (KTable) via .print() it seems ok too).

What I found was that the .process() - which should be calling .punctuate() every 30 seconds is actually blocking for variable lengths of time and outputting somewhat randomly (if at all).

  • Main program
  • Debug output
  • Processor Supplier
  • Processor

Further:

I set the debug level to 'debug' and reran. Im seeing lots of messages:

DEBUG  o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>

but a breakpoint in the .punctuate() function isn't getting hit. So it's doing lots of work but not giving me a chance to use it.

like image 838
ethrbunny Avatar asked Aug 30 '16 16:08

ethrbunny


People also ask

Does Kafka support stream processing?

Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management and real-time querying of application state.

How do you increase Kafka timeout?

The default timeout is 1 minute, to change it, open the Kafka Client Configuration > Producer tab > Advance Properties > add max.block.ms and set to desired value (in milliseconds).

What is consumer timeout in Kafka?

The consumer session timeout is a crucial configuration for group stability. When there is a member failure, the group must pause in order to rebalance partitions. If the failure is spurious, then we often get two rebalances: one for the member failure and one for the failed member when it rejoins.

What is Kafka session timeout MS?

session.timeout.msThe timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker.


1 Answers

A few clarifications:

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG is a lower bound on the commit interval, ie, after a commit, the next commit happens not before this time passed. Basically, Kafka Stream tries to commit ASAP after this time passed, but there is no guarantee whatsoever how long it will actually take to do the next commit.
  • StreamsConfig.POLL_MS_CONFIG is used for the internal KafkaConsumer#poll() call, to specify the maximum blocking time of the poll() call.

Thus, both values are not helpful to heartbeat more often.

Kafka Streams follows a "depth-first" strategy when processing record. This means, that after a poll() for each record all operators of the topology are executed. Let's assume you have three consecutive maps, than all three maps will be called for the first record, before the next/second record will get processed.

Thus, the next poll() call will be made, after all record of the first poll() got fully processed. If you want to heartbeat more often, you need to make sure, that a single poll() call fetches less records, such that processing all records takes less time and the next poll() will be triggered earlier.

You can use configuration parameters for KafkaConsumer that you can specify via StreamsConfig to get this done (see https://kafka.apache.org/documentation.html#consumerconfigs):

streamConfig.put(ConsumerConfig.XXX, VALUE);

  • max.poll.records: if you decrease this value, less record will be polled
  • session.timeout.ms: if you increase this value, there is more time for processing data (adding this for completeness because it is actually a client setting and not a server/broker side configuration -- even if you are aware of this solution and do not like it :))

EDIT

As of Kafka 0.10.1 it is possible (and recommended) to prefix consumer and procuder configs within streams config. This avoids parameter conflicts as some parameter names are used for consumer and producer and cannot be distinguiesh otherwise (and would be applied to consumer and producer at the same time). To prefix a parameter you can use StreamsConfig#consumerPrefix() or StreamsConfig#producerPrefix(), respectively. For example: streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);

One more thing to add: The scenario described in this question is a known issue and there is already KIP-62 that introduces a background thread for KafkaConsumer that send heartbeats, thus decoupling heartbeats from poll() calls. Kafka Streams will leverage this new feature in upcoming releases.

like image 180
Matthias J. Sax Avatar answered Oct 15 '22 18:10

Matthias J. Sax