Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why do the offsets of the consumer-group (app-id) of my Kafka Streams Application get reset after application restart?

I have a Kafka Streams application for which, whenever I restart it, the offsets for the topic it is consuming get reset. Hence, for all partitions, the lags increase and the app needs to reprocess all the data.

UPDATE: The output topic is receiving a burst of events that were already processed after the App gets restarted, is not that the input topic offsets are getting reset as I said in the previous paragraph. However, the internal topic (KTABLE-SUPPRESS-STATE-STORE) offsets are getting reset, see comments below.

I have ensured the lag is 1 for every partition before the restart (this is for the output topic). All consumers that belong to that consumer-group-id (app-id) are active. The restart is immediate, it takes around 30 secs.

The app is using exactly once as processing guarantee.

I have read this answer How does an offset expire for an Apache Kafka consumer group? .

I have tried with auto.offset.reset = latest and auto.offset.reset = earliest.

It seems like the offsets for these topics are not effectively committed, (but I am not sure about this).

I assume that after the restart the app should pick-up from the latest committed offset for that consumer group.

UPDATE: I assume this for the internal topic (KTABLE-SUPPRESS-STATE-STORE)

Does the Kafka Stream API ensure to commit all consumed offset before shutting down? (after calling streams.close())

I would really appreciate any clue about this.

UPDATE:

This is the code the App execute:

final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
        .stream(inputTopicNames, Consumed.with(..., ...)
        .withTimestampExtractor(...);

events
    .filter((k, v) -> ...)
    .flatMapValues(v -> ...)
    .flatMapValues(v -> ...)
    .selectKey((k, v) -> v)
    .groupByKey(Grouped.with(..., ...))
    .windowedBy(
        TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))              
            .advanceBy(Duration.ofSeconds(windowSizeInSecs))
            .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
    .reduce((agg, new) -> {
        ...
        return agg;
    })
    .suppress(Suppressed.untilWindowCloses(
                  Suppressed.BufferConfig.unbounded()))
    .toStream()
    .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));

The offset reset just and always happens (after restarting) with the KTABLE-SUPPRESS-STATE-STORE internal topic created by the Kafka Stream API.

I have tried with the Processing guarantee exactly once and at least once.

Once again, I will really appreciate any clue about this.

UPDATE: This has been solved in the release 2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895)

like image 902
Jonathan Santilli Avatar asked Jan 11 '19 11:01

Jonathan Santilli


1 Answers

The offset reset just and always happens (after restarting) with the KTABLE-SUPPRESS-STATE-STORE internal topic created by the Kafka Stream API.

This is currently (version 2.1) expected behavior, because the suppress() operator works in-memory only. Thus, on restart, the suppress buffer must be recreate from the changelog topic before processing can start.

Note, it is planned to let suppress() write to disk in future releases (cf. https://issues.apache.org/jira/browse/KAFKA-7224). This will avoid the overhead of recreating the buffer from the changelog topic.

like image 194
Matthias J. Sax Avatar answered Sep 17 '22 03:09

Matthias J. Sax