I have the following setup
IObservable<Data> source = ...;
source
.Select(data=>VeryExpensiveOperation(data))
.Subscribe(data=>Console.WriteLine(data));
Normally the events come seperated by a reasonable time frame.
Imagine a user updating a text box in a form. Our VeryExpensiveOperation
might take 5 seconds to complete and whilst it does an hour glass
is displayed on the screen.
However if during the 5 seconds the user updates the textbox again
I would want to send a cancelation to the current VeryExpensiveOperation
before the new one starts.
I would imagine a scenario like
source
.SelectWithCancel((data, cancelToken)=>VeryExpensiveOperation(data, token))
.Subscribe(data=>Console.WriteLine(data));
So every time the lambda is called is is called with a cancelToken which can be
used to manage canceling a Task
. However now we are mixing Task, CancelationToken and RX.
Not quite sure how to fit it all together. Any suggestions.
Bonus Points for figuring out how to test the operator using XUnit :)
FIRST ATTEMPT
public static IObservable<U> SelectWithCancelation<T, U>( this IObservable<T> This, Func<CancellationToken, T, Task<U>> fn )
{
CancellationTokenSource tokenSource = new CancellationTokenSource();
return This
.ObserveOn(Scheduler.Default)
.Select(v=>{
tokenSource.Cancel();
tokenSource=new CancellationTokenSource();
return new {tokenSource.Token, v};
})
.SelectMany(o=>Observable.FromAsync(()=>fn(o.Token, o.v)));
}
Not tested yet. I'm hoping that a task that does not complete generates an IObservable that completes without firing any OnNext
events.
You have to model VeryExpensiveOperation
as an cancellable asynchronous thing. Either a Task
or an IObservable
. I'll assume it is a task with a CancellationToken
:
Task<TResult> VeryExpensiveOperationAsync<TSource, TResult>(TSource item, CancellationToken token);
Then you do it like so:
source
.Select(item => Observable.DeferAsync(async token =>
{
// do not yield the observable until after the operation is completed
// (ie do not just do VeryExpensiveOperation(...).ToObservable())
// because DeferAsync() will dispose of the token source as soon
// as you provide the observable (instead of when the observable completes)
var result = await VeryExpensiveOperationAsync(item, token);
return Observable.Return(result);
})
.Switch();
The Select
just creates a deferred observable that, when subscribed, will create a token and kick off the operation. If the observable is unsubscribed before the operation finishes, the token will be cancelled.
The Switch
subscribes to each new observable that comes out of Select
, unsubscribing from the previous observable it was subscribed to.
This has the effect you want.
P.S. this is easily testable. Just provide a mock source and a mock VeryExpensiveOperation
that uses a TaskCompletetionSource
provided by the unit test so the unit test can control exactly when new source items are produced and when tasks are completed. Something like this:
void SomeTest()
{
// create a test source where the values are how long
// the mock operation should wait to do its work.
var source = _testScheduler.CreateColdObservable<int>(...);
// records the actions (whether they completed or canceled)
List<bool> mockActionsCompleted = new List<bool>();
var resultStream = source.SelectWithCancellation((token, delay) =>
{
var tcs = new TaskCompletionSource<string>();
var tokenRegistration = new SingleAssignmentDisposable();
// schedule an action to complete the task
var d = _testScheduler.ScheduleRelative(delay, () =>
{
mockActionsCompleted.Add(true);
tcs.SetResult("done " + delay);
// stop listening to the token
tokenRegistration.Dispose();
});
// listen to the token and cancel the task if the token signals
tokenRegistration.Disposable = token.Register(() =>
{
mockActionsCompleted.Add(false);
tcs.TrySetCancelled();
// cancel the scheduled task
d.Dispose();
});
return tcs.Task;
});
// subscribe to resultStream
// start the scheduler
// assert the mockActionsCompleted has the correct sequence
// assert the results observed were what you expected.
}
You might run into trouble using testScheduler.Start()
due to the new actions scheduled dynamically. a while loop with testScheduler.AdvanceBy(1)
might work better.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With