Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Caching and invalidating cached Mono

I'm running into an issue while trying to cache a Mono returned by a WebClient. The code is something like that:

public Mono<Token> authenticate() {
    return cachedTokenMono = cachedTokenMono
        .switchIfEmpty(
            Mono.defer(() -> 
                    getToken())
                    .cache(token ->
                               Duration.between(Instant.now(), token.getExpires().toInstant()),
                           (Throwable throwable) -> Duration.ZERO,
                           () -> Duration.ZERO));
}

The intention is that the Mono used to receive a Token is cached until the token expires. After the token expires, the cached Mono becomes empty and a new token is requested. This works as expected, but unfortunately switchIfEmpty() does not actually "switch", it wraps the source Mono instead. As a result, this creates a memory leak as more and more wrapping SwitchIfEmptyMono are created. What is the correct pattern in this case? Is there a way to substitute an empty Mono with a new one?

like image 872
kobasad Avatar asked Jun 07 '26 03:06

kobasad


1 Answers

You could do something like this:

private final Mono<Token> authenticateMono = getToken()
            .cache(
                    token -> Duration.between(Instant.now(), token.getExpires().toInstant()),
                    throwable -> Duration.ZERO,
                    () -> Duration.ZERO)

public Mono<Token> authenticate() {
    return authenticateMono;
}

The idea is that you're returning the same caching Mono<Token> instance for each call to authenticate(). The .cache operator ensures that the cached result is checked for each subscription.

Specifically:

  • if a new subscription arrives and there is no cached value, then the cache operator will subscribe to the Mono<Token> returned from getToken() (which will trigger the token retrieval).
  • if a value has been cached, and a new subscription arrives before the cache has timed out, then the cache operator will emit the cached value to the new subscriber
  • if a value has been cached, and a new subscription arrives after the cache has timed out, then the cache operator will resubscribe to the Mono<Token> returned from getToken() (which will trigger the token re-retrieval).
  • If the Mono<Token> returned from getToken() completes with an exception, then that exception won't be cached, and therefore will propagate, and the next subscription that arrives will re-trigger the token retrieval again

This all assumes that:

  • getToken() does not do any work before a subscriber arrives
  • getToken() retrieves the token for each subscriber
  • you only want one active token for all subscribers

Also note, that depending on your use case, you might want to expire the token slightly before it's expiration date, to account for clock skew. i.e. to preemptively retrieve a new token just before it actually expires to prevent returning a Token that will expire before the downstream has had an opportunity to use it.

like image 151
Phil Clay Avatar answered Jun 10 '26 10:06

Phil Clay



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!