Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cache the result of a Mono from a WebClient call in a Spring WebFlux web application

I am looking to cache a Mono (only if it is successful) which is the result of a WebClient call.

From reading the project reactor addons docs I don't feel that CacheMono is a good fit as it caches the errors as well which I do not want.

So instead of using CacheMono I am doing the below:

Cache<MyRequestObject, Mono<MyResponseObject>> myCaffeineCache = 
    Caffeine.newBuilder()
            .maximumSize(100)
            .expireAfterWrite(Duration.ofSeconds(60))
            .build();

MyRequestObject myRequestObject = ...;

Mono<MyResponseObject> myResponseObject = myCaffeineCache.get(myRequestObject,
    requestAsKey -> WebClient.create()
                             .post()
                             .uri("http://www.example.com")
                             .syncBody(requestAsKey)
                             .retrieve()
                             .bodyToMono(MyResponseObject.class)
                             .cache()
                             .doOnError(t -> myCaffeineCache.invalidate(requestAsKey)));

Here I am calling cache on the Mono and then adding it to the caffeine cache.

Any errors will enter doOnError to invalidate the cache.

Is this a valid approach to caching a Mono WebClient response?

like image 251
mahanhz Avatar asked Oct 12 '18 22:10

mahanhz


People also ask

What is mono cache?

@Deprecated public class CacheMono extends Object. Opinionated caching helper that defines how to store and restore a Mono in an arbitrary cache abstraction. A generic writer/reader entry point is provided, but cache vendors that have a Map wrapper support can also be directly used.

How do you get an object from mono without blocking?

A non-blocking way would be via one of the overloaded subscribe() methods. In this example, we will use the subscribe(Consumer<? super T> consumer) to get the data from Mono asynchronously. With subscribe(), the current thread will not be blocked waiting for the Publisher to emit data.

What is mono and flux in WebFlux?

Project Reactor is the implementation of Reactive Streams specification. Reactor provides two types: Mono: implements Publisher and returns 0 or 1 elements. Flux: implements Publisher and returns N elements.

How does Spring WebFlux work internally?

To clarify, Spring WebFlux can adapt to different runtimes through a common API provided by HttpHandler. This API is a simple contract with just one method that provides an abstraction over different server APIs, like Reactor Netty, Servlet 3.1 API, or Undertow APIs.


2 Answers

This is one of the very few use cases where you'd be actually allowed to call non-reactive libraries and wrap them with reactive types, and have processing done in side-effects operators like doOnXYZ, because:

  • Caffeine is an in-memory cache, so as far as I know there's no I/O involved
  • Caches often don't offer strong guarantees about caching values (it's very much "fire and forget)

You can then in this case query the cache to see if a cached version is there (wrap it and return right away), and cache a successful real response in a doOn operator, like this:

public class MyService {

    private WebClient client;

    private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;

    public MyService() {
        this.client = WebClient.create();
        this.myCaffeineCache = Caffeine.newBuilder().maximumSize(100)
          .expireAfterWrite(Duration.ofSeconds(60)).build();
    }

    public Mono<MyResponseObject> fetchResponse(MyRequestObject request) {

        MyResponseObject cachedVersion = this.myCaffeineCache.get(myRequestObject);
        if (cachedVersion != null) {
           return Mono.just(cachedVersion);
        } else {
           return this.client.post()
                         .uri("http://www.example.com")
                         .syncBody(request.getKey())
                         .retrieve()
                         .bodyToMono(MyResponseObject.class)
                         .doOnNext(response -> this.myCaffeineCache.put(request.getKey(), response));
    }
}

Note that I wouldn't cache reactive types here, since there's no I/O involved nor backpressure once the value is returned by the cache. On the contrary, it's making things more difficult with subscription and other reactive streams constraints.

Also you're right about the cache operator since it isn't about caching the value per se, but more about replaying what happened to other subscribers. I believe that cache and replay operators are actually synonyms for Flux.

like image 104
Brian Clozel Avatar answered Sep 18 '22 14:09

Brian Clozel


Actually, you don't have to save errors with CacheMono.

private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;

...

Mono<MyResponseObject> myResponseObject =
        CacheMono.lookup(key -> Mono.justOrEmpty(myCaffeineCache.getIfPresent(key))
                .map(Signal::next), myRequestObject)
                .onCacheMissResume(() -> /* Your web client or other Mono here */)
                .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                        Optional.ofNullable(signal.get())
                                .ifPresent(value -> myCaffeineCache.put(key, value))));

When you switch to external cache, this may be usefull. Don't forget using reactive clients for external caches.

like image 31
Alexander Pankin Avatar answered Sep 19 '22 14:09

Alexander Pankin