So my use-case is to consume messages from Kafka in a Spring Webflux application while programming in the reactive style using Project Reactor, and to perform a non-blocking operation for each message in the same order as the messages were received from Kafka. The system should also be able to recover on its own.
Here is the code snippet that is setup to consume from :
Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.map(this::performAction)
.flatMapSequential(receiverRecordMono -> receiverRecordMono)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();
As you can see, what I do is: take the message from Kafka, transform it into an object intended for a new destination, then send it to the destination, and then acknowledge the offset to mark the message as consumed and processed. It is critical to acknowledge the offset in the same order as the messages being consumed from Kafka so that we don't move the offset beyond messages that were not fully processed (including sending some data to the destination). Hence I'm using a flatMapSequential
to ensure this.
For simplicity let's assume the transformToOutputFormat()
method is an identity transform.
public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
return record;
}
The performAction()
method needs to do something over the network, say call an HTTP REST API. So the appropriate APIs return a Mono, which means the chain needs to be subscribed to. Also, I need the ReceiverRecord
to be returned by this method so that the offset can be acknowledged in the flatMapSequential() operator above. Because I need the Mono subscribed to, I'm using flatMapSequential
above. If not, I could have used a map
instead.
public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
return Mono.just(record)
.flatMap(receiverRecord ->
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
)
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.then(Mono.just(record));
I have two conflicting needs in this method: 1. Subscribe to the chain that makes the HTTP call 2. Return the ReceiverRecord
Using a flatMap() means my return type changes to a Mono. Using doOnNext() in the same place would retain the ReceiverRecord in the chain, but would not allow the HttpClient response to be subscribed to automatically.
I can't add .subscribe()
after asString()
, because I want to wait till the HTTP response is completely received before the offset is acknowledged.
I can't use .block()
either since it runs on a parallel thread.
As a result, I need to cheat and return the record
object from the method scope.
The other thing is that on a retry inside performAction
it switches threads. Since flatMapSequential() eagerly subscribes to each Mono in the outer flux, this means that while acknowledgement of offsets can be guaranteed in order, we can't guarantee that the HTTP call in performAction
will be performed in the same order.
So I have two questions.
record
in a natural way rather than returning the method scope object?Here is the solution I have come up with.
Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.delayUntil(this::performAction)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();
Instead of using flatMapSequential to subscribe to the performAction Mono and preserve sequence, what I've done instead is delayed the request for more messages from the Kafka receiver until the action is performed. This enables the one-at-a-time processing that I need.
As a result, performAction doesn't need to return a Mono of ReceiverRecord. I also simplified it to the following:
public Mono<String> performAction(ReceiverRecord<Integer, DataDocument> record) {
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With