I'm probably missing something but I can't figure out what it is.
The following code does nothing at all:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.subscribe();
If I try to block the call it works fine:
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.doOnNext(System.out::println)
.block();
The weird thing is that if I create a Flux "manually" (i.e not coming from the spring webClient) this works fine:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.doOnNext(System.out::println)
.subscribe();
Could someone please explain what it is that I am doing wrong? Isn't .subscribe()
supposed to execute the operation in the first case, just like it did in the last?
Thanks!
To make the data flow, you have to subscribe to Flux using one of the subscribe() methods. When we call the subscribe(), we are telling the Publisher to start sending data. We can also create functions that will be executed for each of the signals that Publisher sends when we subscribe.
Subscribe to this Flux and block until the upstream signals its first value, completes or a timeout expires. Subscribe to this Flux and block indefinitely until the upstream signals its last value or completes.
Flux collectList() collectList() will accumulates sequence into a Mono<List> , then we use block() method to subscribe to the Mono and block indefinitely until a next signal is received.
Mono is more relatable to the Optional class in Java since it contains 0 or 1 value, and Flux is more relatable to List since it can have N number of values.
Short Answer
subscribe
does not block current thread, that means app main thread can complete earlier than Flux emits any element. So either use block
or employ waiting in the main thread.
Details
Call to the no-args subscribe() just makes request(unbounded)
on Flux
without setting up any Subscriber
. It triggers operation generally in a separate thread but does not block the current thread. Most likely, your main thread ends before WebClient
received the response in that separate thread and passive side effect doOnNext(...)
happened.
To illustrate/test that operation is started, wait for some time in the main thread. Just put the following line right after subscribe()
call:
Thread.sleep(1000);
Now, after playing with the timeout value, you'll be able to see result printed.
Let's now implicitly ship a custom Scheduler
for async operations and wait for all its tasks to be completed. Also, let's pass the System.out::println
as subscribe(...)
argument instead of doOnNext
, so that complete code appears as follows:
ExecutorService executor = Executors.newSingleThreadExecutor();
webClient.get().uri("/some/path/here").retrieve()
.bodyToMono(GetLocationsResponse.class)
.publishOn(Schedulers.fromExecutor(executor)) // next operation will go to this executor
.subscribe(System.out::println); //still non-blocking
executor.awaitTermination(1, TimeUnit.SECONDS); //block current main thread
This example uses slightly different subscribe(Consumer). Most importantly, it adds publishOn(Scheduler) which is backed by ExecutorService
. The latter is used then to wait for termination in the main thread.
Surely, the much easier way to achieve the same result is to use block()
as you mentioned initially:
webClient.get().uri("/some/path/here").retrieve() .bodyToMono(GetLocationsResponse.class) .doOnNext(System.out::println) .block();
Finally, note on your third example with Flux.just(...)...subscribe()
- seems it just quickly completes before your main thread gets terminated. That's because it requires way less time to emit a few String
elements compared to the emission of a single GetLocationsResponse
element (implying timings for write request+read response+parse into POJO). However, if you make this Flux
to delay elements, you'll get the same behavior reproduced:
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500)) //this makes it stop printing in main thread
.doOnNext(System.out::println)
.subscribe();
Flux.just("1", "2", "3")
.filter(s -> !s.equals("2"))
.delayElements(Duration.ofMillis(500))
.doOnNext(System.out::println)
.blockLast(); //and that makes it printing back again
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With