I am a Reactor newbie. I am trying to develop the following application logic:
source
.target
.source
.The only solution I found is to rewrite the above business logic as it follows.
source
.target
.target
.The code implementing the second logic is the following:
receiver.receive()
.flatMap(this::processMessage)
.map(this::acknowledgeMessagesNotToWriteInKafka)
.filter(this::isMessageToWriteInKafka)
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());
Clearly, receiver
type is KafkaReceiver
, and method sendToKafka
uses a KafkaSender
. One of the things I don't like is that I am using a map
to acknowledge some messages.
Is there any better solution to implement the original logic?
This is not exactly your four business logic steps, but I think it's a little bit closer to what you want.
You could acknowledge the "discarded" messages that won't be written in .doOnDiscard
after .filter
...
receiver.receive()
.flatMap(this::processMessage)
.filter(this::isMessageToWriteInKafka)
.doOnDiscard(ReceiverRecord.class, record -> record.receiverOffset().acknowledge())
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());
Note: you'll need to use the proper object type that was discarded. I don't know what type of object the Publisher returned from processMessage
emits, but I assume you can get the ReceiverRecord
or ReceiverOffset
from it in order to acknowledge it.
Alternatively, you could combine filter
/doOnDiscard
into a single .handle
operator...
receiver.receive()
.flatMap(this::processMessage)
.handle((m, sink) -> {
if (isMessageToWriteInKafka(m)) {
sink.next(m);
} else {
m.getReceiverRecord().getReceiverOffset().acknowledge();
}
})
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With