I'm using a spring flux to send parallel requests to a service, this is very simplified version of it:
Flux.fromIterable(customers)
.flatMap { customer ->
client.call(customer)
} ...
I was wondering how I could cancel this flux, as in, grab a reference to the flux somehow and tell it to shut down.
A Flux can be endless, meaning that it can keep emitting elements forever. Also it can return a sequence of elements and then send a completion notification when it has returned all of its elements. In Spring WebFlux, we call reactive APIs/functions that return monos and fluxes and your controllers will return monos and fluxes.
In Spring WebFlux, we call reactive APIs/functions that return monos and fluxes and your controllers will return monos and fluxes. When you invoke an API that returns a mono or a flux, it will return immediately.
Guide to Spring 5 WebFlux. 1 1. Overview. Spring WebFlux is part of Spring 5 and provides reactive programming support for web applications. In this tutorial, we'll be creating a ... 2 2. Spring WebFlux Framework. 3 3. Dependencies. 4 4. Reactive REST Application. 5 5. Reactive RestController. More items
Calling methods on a Flux or Mono (the operators) doesn’t immediately trigger the behavior. Instead, a new instance of Flux (or Mono) is returned, on which you can continue composing further operators. You thus create a chain of operators (or an operator acyclic graph), which represents your asynchronous processing pipeline.
As you probably know, with reactive objects, all operators are lazy. This means execution of the pipeline is delayed until the moment you subscribe to the reactive stream.
So, in your example, there is nothing to cancel yet because nothing is happening at that point.
But supposing your example was extended to:
Disposable disp = Flux.fromIterable(customers)
.flatMap { customer ->
client.call(customer)
}
.subscribe();
Then, as you can see, your subscription returns a Disposable
object that you can use to cancel the entire thing if you want, e.g.
disp.dispose()
Documentation of dispose says:
Cancel or dispose the underlying task or resource.
There’s another section of the documentation that says the following:
These variants [of operators] return a reference to the subscription that you can use to cancel the subscription when no more data is needed. Upon cancellation, the source should stop producing values and clean up any resources it created. This cancel and clean-up behavior is represented in Reactor by the general-purpose
Disposable
interface.
Therefore canceling the execution of stream is not free from complications on the reactive object side, because you want to make sure to leave the world in a consistent state if you cancel the stream in the middle of its processing. For example, if you were in the process of building something, you may want to discard resources, destroy any partial aggregation results, close files, channels, release memory or any other resources you have, potentially undoing changes or compensating for them.
You may want to read the documentation on cleanup about this, such that you also consider what you can do on the reactive object side.
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});
Answer from @Edwin is precise. As long as you don't call subscribe, there is nothing to cancel, because no code will be executed.
Just wanted to add an example to make it clear.
public static void main(String[] args) throws InterruptedException {
List<String> lists = Lists.newArrayList("abc", "def", "ghi");
Disposable disposable = Flux.fromIterable(lists)
.delayElements(Duration.ofSeconds(3))
.map(String::toLowerCase)
.subscribe(System.out::println);
Thread.sleep(5000); //Sleeping so that some elements in the flux gets printed
disposable.dispose();
Thread.sleep(10000); // Sleeping so that we can prove even waiting for some time nothing gets printed after cancelling the flux
}
But I would say a much cleaner way (functional way) is to make use of functions like takeUntil
or take
. For instance I can stop the stream in the above example like this as well.
List<String> lists = Lists.newArrayList("abc", "def", "End", "ghi");
Flux.fromIterable(lists).takeUntil(s -> s.equalsIgnoreCase("End"))
.delayElements(Duration.ofSeconds(3))
.map(String::toLowerCase)
.subscribe(System.out::println);
or
List<String> lists = Lists.newArrayList("abc", "def", "ghi");
Flux.fromIterable(lists).take(2)
.delayElements(Duration.ofSeconds(2))
.map(String::toLowerCase)
.subscribe(System.out::println);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With