Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Caching in Spring 5 WebFlux

Are there any way to cache a Flux coming from WebClient in Spring 5? I tried this but is not caching anything.

@RestController
@SpringBootApplication
@EnableCaching
public class GatewayApplication {

 @PostMapping(value ="/test", produces = "application/json")
 public Flux<String> handleRequest(@RequestBody String body) {
    return getHspadQuery(body);
 }

 @Cacheable("testCache")
 private Flux<String> getData (String body) {
    return WebClient.create().post()
            .uri("http://myurl")
            .body(BodyInserters.fromObject(body))
            .retrieve().bodyToFlux(String.class).cache();
 }
}

When I make the third request it never finishs. And in then in the subsequent requests I get the response but the server throws the following:

2018-04-09 12:36:23.920 ERROR 11488 --- [ctor-http-nio-4] r.ipc.netty.channel.ChannelOperations    : [HttpServer] Error processing connection. Requesting close the channel
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:292) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:251) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:205) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:180) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:646) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:523) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onSubscribe(FluxFlatMap.java:897) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:6873) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:372) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.5.RELEASE.jar:3.1.5.RELEASE]
at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:211) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:326) ~[reactor-netty-0.7.5.RELEASE.jar:0.7.5.RELEASE]
...

And it never caches anything.

Any help would be appreciated.

Thanks.

like image 442
Esteban S Avatar asked Apr 09 '18 10:04

Esteban S


2 Answers

There is an reactor cache add-on which can be used with Spring CacheManager. But, as pointed out by the comments in the accepted answer, currently, the Spring cache APIs(gets and puts) are still blocking. We can only make the program fully reactive until this issue is solved.

Here is the sample code snippet in java. The complete sample project is here in github.

@Service
public class CatServiceImpl implements CatService {
    private static final String CACHE_NAME = "sr";
    private static final String KEY = "k";
    @Autowired
    private WebClient client;

    @Autowired
    private CacheManager cacheManager;

    @SuppressWarnings("unchecked")
    private Function<String, Mono<List<Signal<CatDto>>>> reader = k -> Mono
            .justOrEmpty((Optional.ofNullable((List<CatDto>) (cacheManager.getCache(CACHE_NAME).get(k, List.class)))))
            .flatMap(v -> Flux.fromIterable(v).materialize().collectList());

    private BiFunction<String, List<Signal<CatDto>>, Mono<Void>> writer = (k, sigs) -> Flux.fromIterable(sigs)
            .dematerialize().collectList().doOnNext(l -> cacheManager.getCache(CACHE_NAME).put(k, l)).then();

    @Override
    public Flux<CatDto> search() {
        Flux<CatDto> fromServer = client.get().retrieve().bodyToFlux(CatDto.class);

        return CacheFlux.lookup(reader, KEY).onCacheMissResume(fromServer).andWriteWith(writer);
    }

}
like image 86
chao_chang Avatar answered Nov 15 '22 12:11

chao_chang


For now, @Cacheable doesn't work with Flux (and Reactor in general). But regarding your example, each time you call the method, you're creating a new Flux instance, so naturally, it never caches anything.

To be able to cache results, you need to either convert a Flux to a list instance, or simply keep reusing one Flux instance

like image 31
Grzegorz Piwowarek Avatar answered Nov 15 '22 10:11

Grzegorz Piwowarek