Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Commit offset in Reactor Kafka

I have a reactor Kafka project that consumes messages from Kafka topic, transforms the message and then writes to another topic.

public Flux<String> consume(String destTopic) {
        return kafkaConsumerTemplate
                .receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .doOnNext(s-> sendToKafka(s,destTopic))
                .map(ConsumerRecord::value)
                .doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
    }

My understanding is the offset is committed only after all the sequence steps are completed successfully in reactor. Is that correct? I want to make sure the next record is not processed unless the current record is successfully sent to the destination Kafka Topic.

like image 453
perplexedDev Avatar asked Mar 27 '26 20:03

perplexedDev


1 Answers

The implementation is like this:

@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
    return withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler
        .receive()
        .filter(it -> !it.isEmpty())
        .publishOn(scheduler, preparePublishOnQueueSize(prefetch))
        .map(consumerRecords -> Flux.fromIterable(consumerRecords)
            .doAfterTerminate(() -> {
                for (ConsumerRecord<K, V> r : consumerRecords) {
                    handler.acknowledge(r);
                }
            })));
}

So, every ConsumerRecords is ack'ed only when its Flux is fully processed: successfully or with an error. Therefore it is not a commit-per-record. And technically it must not be per record anyway, since we need a commit only our consumer app fails and we need to continue from the offset we have left before. The currently active KafkaConsumer keeps a cursor in-memory and doesn't care if you commit or not.

If you really want "per record" see ReactiveKafkaConsumerTemplate.receive() and its KafkaReceiver.receive() delegate:

/**
 * Starts a Kafka consumer that consumes records from the subscriptions or partition
 * assignments configured for this receiver. Records are consumed from Kafka and delivered
 * on the returned Flux when requests are made on the Flux. The Kafka consumer is closed
 * when the returned Flux terminates.
 * <p>
 * Every record must be acknowledged using {@link ReceiverOffset#acknowledge()} in order
 * to commit the offset corresponding to the record. Acknowledged records are committed
 * based on the configured commit interval and commit batch size in {@link ReceiverOptions}.
 * Records may also be committed manually using {@link ReceiverOffset#commit()}.
 *
 * @return Flux of inbound receiver records that are committed only after acknowledgement
 */
default Flux<ReceiverRecord<K, V>> receive() {
like image 159
Artem Bilan Avatar answered Mar 31 '26 05:03

Artem Bilan



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!