Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Conditional logic on a Reactor Flux

I am a Reactor newbie. I am trying to develop the following application logic:

  1. Read messages from a Kafka topic source.
  2. Transform the massages.
  3. Write a subset of the transformed messages to a new Kafka topic target.
  4. Explicitly acknowledge the reading operation for all the messages originally read from topic source.

The only solution I found is to rewrite the above business logic as it follows.

  1. Read messages from a Kafka topic source.
  2. Transform the massages.
  3. Immediately acknowledge the message not be written to topic target.
  4. Filter all the above messages.
  5. Write the rest of the transformed messages to the new Kafka topic target.
  6. Explicitly acknowledge the reading operation for these messages

The code implementing the second logic is the following:

receiver.receive()
        .flatMap(this::processMessage)
        .map(this::acknowledgeMessagesNotToWriteInKafka)
        .filter(this::isMessageToWriteInKafka)
        .as(this::sendToKafka)
        .doOnNext(r -> r.correlationMetadata().acknowledge());

Clearly, receiver type is KafkaReceiver, and method sendToKafka uses a KafkaSender. One of the things I don't like is that I am using a map to acknowledge some messages.

Is there any better solution to implement the original logic?

like image 246
riccardo.cardin Avatar asked Oct 13 '25 05:10

riccardo.cardin


1 Answers

This is not exactly your four business logic steps, but I think it's a little bit closer to what you want.

You could acknowledge the "discarded" messages that won't be written in .doOnDiscard after .filter...

receiver.receive()
        .flatMap(this::processMessage)
        .filter(this::isMessageToWriteInKafka)
        .doOnDiscard(ReceiverRecord.class, record -> record.receiverOffset().acknowledge())
        .as(this::sendToKafka)
        .doOnNext(r -> r.correlationMetadata().acknowledge());

Note: you'll need to use the proper object type that was discarded. I don't know what type of object the Publisher returned from processMessage emits, but I assume you can get the ReceiverRecord or ReceiverOffset from it in order to acknowledge it.

Alternatively, you could combine filter/doOnDiscard into a single .handle operator...

receiver.receive()
        .flatMap(this::processMessage)
        .handle((m, sink) -> {
            if (isMessageToWriteInKafka(m)) {
                sink.next(m);
            } else {
                m.getReceiverRecord().getReceiverOffset().acknowledge();
            }
        })
        .as(this::sendToKafka)
        .doOnNext(r -> r.correlationMetadata().acknowledge());
like image 95
Phil Clay Avatar answered Oct 14 '25 21:10

Phil Clay