Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx and tasks - cancel running task when new task is spawned?

I have an user interaction scenario I'd like to handle with Rx.

The scenario is similar to the canonical "when user stops typing, do some work" (usually, search for what the user has typed so far) (1) - but I also need to :

  • (2) only get the latest of the results of "do some work" units (see below)
  • (3) when a new unit of work starts, cancel any work in progress (in my case it's CPU intensive)

For (1) I use an IObservable for the user events, throttled with .Throttle() to only trigger on pauses between events ("user stops typing").

From that, i .Select(_ => CreateMyTask(...).ToObservable()).

This gives me an IObservable<IObservable<T>> where each of the inner observables wraps a single task.

To get (2) I finally apply .Switch() to only get the results from the newest unit of work.

What about (3) - cancel pending tasks ?

If I understand correctly, whenever there's a new inner IObservable<T>, the .Switch() method subscribes to it and unsubscribes from the previous one(s), causing them to Dispose().
Maybe that can be somehow wired to trigger the task to cancel?

like image 749
Cristian Diaconescu Avatar asked Aug 27 '13 23:08

Cristian Diaconescu


People also ask

What is the way to create a task that can be Cancelled after it starts?

Create and start a cancelable task. Pass a cancellation token to your user delegate and optionally to the task instance. Notice and respond to the cancellation request in your user delegate. Optionally notice on the calling thread that the task was canceled.

Which object do you inspect to determine if a long running task will be Cancelled?

The background long-running tasks are cancelled by using the CancellationToken object in Blazor.

How do I stop async tasks?

A task can be cancelled at any time by invoking cancel(boolean). Invoking this method will cause subsequent calls to isCancelled() to return true. After invoking this method, onCancelled(Object), instead of onPostExecute(Object) will be invoked after doInBackground(Object[]) returns.

What is task Cancelled exception?

TaskCanceledException(String, Exception, CancellationToken) Initializes a new instance of the TaskCanceledException class with a specified error message, a reference to the inner exception that is the cause of this exception, and the CancellationToken that triggered the cancellation. TaskCanceledException(Task)


1 Answers

You can just use Observable.FromAsync which will generate tokens that are cancelled when the observer unsubcribes:

input.Throttle(...)
     .Select(_ => Observable.FromAsync(token => CreateMyTask(..., token)))
     .Switch()
     .Subscribe(...);

This will generate a new token for each unit of work and cancel it every time Switch switches to the new one.

like image 78
Brandon Avatar answered Sep 28 '22 05:09

Brandon