I have a method returning an IObservable<long>
that I call from an async
method. I want to convert this into a normal List<long>
, but to have that operation be cancelled if my CancellationToken
is signaled.
I would like to do something like this:
List<long> result = await Foo().ToList(myCancellationToken);
What is the correct (and simplest) way to accomplish this? The ToList()
extension method of IObservable<T>
returns an IObservable<List<T>>
and does not take a CancellationToken
parameter.
var list = await Foo().ToList().ToTask(cancellationToken);
This has the advantage of cancelling immediately if the token is cancelled (the other Answer will not cancel until the next time the Foo
observable produces a value).
Use TakeWhile to terminate the list.
CancellationToken MyToken = ...
var list = await Foo().TakeWhile(v=>!MyToken.IsCancellationRequested).ToList();
If you are worried about the subscription only cancelling when the next item is provided you can have this extensions method.
public static IObservable<T>
TakeWhile<T>
( this IObservable<T> This
, CancellationToken t
)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(t);
return Observable.Create<T>(( IObserver<T> observer ) =>
{
This.Subscribe(observer, cts.Token);
return Disposable.Create(() => cts.Cancel());
});
}
and write
CancellationToken MyToken = ...
var list = await Foo().TakeWhile(MyToken.IsCancellationRequested).ToList();
Using TakeWhile with a cancellation token is more composable than ToTask which just returns the last element.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With