Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to cancel a Select in RX if it is not finished before the next event arrives

I have the following setup

IObservable<Data> source = ...;

source
    .Select(data=>VeryExpensiveOperation(data))
    .Subscribe(data=>Console.WriteLine(data));

Normally the events come seperated by a reasonable time frame. Imagine a user updating a text box in a form. Our VeryExpensiveOperation might take 5 seconds to complete and whilst it does an hour glass is displayed on the screen.

However if during the 5 seconds the user updates the textbox again I would want to send a cancelation to the current VeryExpensiveOperation before the new one starts.

I would imagine a scenario like

source
    .SelectWithCancel((data, cancelToken)=>VeryExpensiveOperation(data, token))
    .Subscribe(data=>Console.WriteLine(data));

So every time the lambda is called is is called with a cancelToken which can be used to manage canceling a Task. However now we are mixing Task, CancelationToken and RX. Not quite sure how to fit it all together. Any suggestions.

Bonus Points for figuring out how to test the operator using XUnit :)

FIRST ATTEMPT

    public static IObservable<U> SelectWithCancelation<T, U>( this IObservable<T> This, Func<CancellationToken, T, Task<U>> fn )
    {
        CancellationTokenSource tokenSource = new CancellationTokenSource();

        return This
            .ObserveOn(Scheduler.Default)
            .Select(v=>{
                tokenSource.Cancel();
                tokenSource=new CancellationTokenSource();
                return new {tokenSource.Token, v};
            })
            .SelectMany(o=>Observable.FromAsync(()=>fn(o.Token, o.v)));
    }

Not tested yet. I'm hoping that a task that does not complete generates an IObservable that completes without firing any OnNext events.

like image 370
bradgonesurfing Avatar asked Jul 24 '13 14:07

bradgonesurfing


1 Answers

You have to model VeryExpensiveOperation as an cancellable asynchronous thing. Either a Task or an IObservable. I'll assume it is a task with a CancellationToken:

Task<TResult> VeryExpensiveOperationAsync<TSource, TResult>(TSource item, CancellationToken token);

Then you do it like so:

source
    .Select(item => Observable.DeferAsync(async token =>
    {
        // do not yield the observable until after the operation is completed
        // (ie do not just do VeryExpensiveOperation(...).ToObservable())
        // because DeferAsync() will dispose of the token source as soon
        // as you provide the observable (instead of when the observable completes)
        var result = await VeryExpensiveOperationAsync(item, token);
        return Observable.Return(result);
    })
    .Switch();

The Select just creates a deferred observable that, when subscribed, will create a token and kick off the operation. If the observable is unsubscribed before the operation finishes, the token will be cancelled.

The Switch subscribes to each new observable that comes out of Select, unsubscribing from the previous observable it was subscribed to.

This has the effect you want.

P.S. this is easily testable. Just provide a mock source and a mock VeryExpensiveOperation that uses a TaskCompletetionSource provided by the unit test so the unit test can control exactly when new source items are produced and when tasks are completed. Something like this:

void SomeTest()
{
    // create a test source where the values are how long
    // the mock operation should wait to do its work.
    var source = _testScheduler.CreateColdObservable<int>(...);

    // records the actions (whether they completed or canceled)
    List<bool> mockActionsCompleted = new List<bool>();
    var resultStream = source.SelectWithCancellation((token, delay) =>
    {
        var tcs = new TaskCompletionSource<string>();
        var tokenRegistration = new SingleAssignmentDisposable();

        // schedule an action to complete the task
        var d = _testScheduler.ScheduleRelative(delay, () =>
        {
           mockActionsCompleted.Add(true);
           tcs.SetResult("done " + delay);
           // stop listening to the token
           tokenRegistration.Dispose();
        });

        // listen to the token and cancel the task if the token signals
        tokenRegistration.Disposable = token.Register(() =>
        {
           mockActionsCompleted.Add(false);
           tcs.TrySetCancelled();
           // cancel the scheduled task
           d.Dispose();
        });

        return tcs.Task;
    });

    // subscribe to resultStream
    // start the scheduler
    // assert the mockActionsCompleted has the correct sequence
    // assert the results observed were what you expected.
}

You might run into trouble using testScheduler.Start() due to the new actions scheduled dynamically. a while loop with testScheduler.AdvanceBy(1) might work better.

like image 192
Brandon Avatar answered Oct 16 '22 18:10

Brandon