Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to cancel an ongoing Spring Flux?

I'm using a spring flux to send parallel requests to a service, this is very simplified version of it:

Flux.fromIterable(customers)
  .flatMap { customer ->
     client.call(customer)
  } ...

I was wondering how I could cancel this flux, as in, grab a reference to the flux somehow and tell it to shut down.

like image 927
kms333 Avatar asked Aug 10 '18 12:08

kms333


People also ask

What is the difference between flux and spring webflux?

A Flux can be endless, meaning that it can keep emitting elements forever. Also it can return a sequence of elements and then send a completion notification when it has returned all of its elements. In Spring WebFlux, we call reactive APIs/functions that return monos and fluxes and your controllers will return monos and fluxes.

How do I return Monos and fluxes in spring webflux?

In Spring WebFlux, we call reactive APIs/functions that return monos and fluxes and your controllers will return monos and fluxes. When you invoke an API that returns a mono or a flux, it will return immediately.

What is spring 5 webflux?

Guide to Spring 5 WebFlux. 1 1. Overview. Spring WebFlux is part of Spring 5 and provides reactive programming support for web applications. In this tutorial, we'll be creating a ... 2 2. Spring WebFlux Framework. 3 3. Dependencies. 4 4. Reactive REST Application. 5 5. Reactive RestController. More items

How do you call a method on a flux method?

Calling methods on a Flux or Mono (the operators) doesn’t immediately trigger the behavior. Instead, a new instance of Flux (or Mono) is returned, on which you can continue composing further operators. You thus create a chain of operators (or an operator acyclic graph), which represents your asynchronous processing pipeline.


2 Answers

As you probably know, with reactive objects, all operators are lazy. This means execution of the pipeline is delayed until the moment you subscribe to the reactive stream.

So, in your example, there is nothing to cancel yet because nothing is happening at that point.

But supposing your example was extended to:

Disposable disp = Flux.fromIterable(customers)
  .flatMap { customer ->
     client.call(customer)
  }
  .subscribe();

Then, as you can see, your subscription returns a Disposable object that you can use to cancel the entire thing if you want, e.g.

disp.dispose()

Documentation of dispose says:

Cancel or dispose the underlying task or resource.

There’s another section of the documentation that says the following:

These variants [of operators] return a reference to the subscription that you can use to cancel the subscription when no more data is needed. Upon cancellation, the source should stop producing values and clean up any resources it created. This cancel and clean-up behavior is represented in Reactor by the general-purpose Disposable interface.

Therefore canceling the execution of stream is not free from complications on the reactive object side, because you want to make sure to leave the world in a consistent state if you cancel the stream in the middle of its processing. For example, if you were in the process of building something, you may want to discard resources, destroy any partial aggregation results, close files, channels, release memory or any other resources you have, potentially undoing changes or compensating for them.

You may want to read the documentation on cleanup about this, such that you also consider what you can do on the reactive object side.

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) 
        .onDispose(() -> channel.close())  
    });
like image 63
Edwin Dalorzo Avatar answered Nov 04 '22 16:11

Edwin Dalorzo


Answer from @Edwin is precise. As long as you don't call subscribe, there is nothing to cancel, because no code will be executed.
Just wanted to add an example to make it clear.

public static void main(String[] args) throws InterruptedException {

   List<String> lists = Lists.newArrayList("abc", "def", "ghi");
   Disposable disposable = Flux.fromIterable(lists)
                           .delayElements(Duration.ofSeconds(3))
                           .map(String::toLowerCase)
                           .subscribe(System.out::println);

   Thread.sleep(5000); //Sleeping so that some elements in the flux gets printed
   disposable.dispose();

   Thread.sleep(10000); // Sleeping so that we can prove even waiting for some time nothing gets printed after cancelling the flux
}

But I would say a much cleaner way (functional way) is to make use of functions like takeUntil or take. For instance I can stop the stream in the above example like this as well.

List<String> lists = Lists.newArrayList("abc", "def", "End", "ghi");
Flux.fromIterable(lists).takeUntil(s -> s.equalsIgnoreCase("End"))
                           .delayElements(Duration.ofSeconds(3))
                           .map(String::toLowerCase)
                           .subscribe(System.out::println);

or

List<String> lists = Lists.newArrayList("abc", "def", "ghi");
Flux.fromIterable(lists).take(2)
                           .delayElements(Duration.ofSeconds(2))
                           .map(String::toLowerCase)
                           .subscribe(System.out::println);
like image 39
pvpkiran Avatar answered Nov 04 '22 18:11

pvpkiran