Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert IObservable<T> to list in RX with CancellationToken support

I have a method returning an IObservable<long> that I call from an async method. I want to convert this into a normal List<long>, but to have that operation be cancelled if my CancellationToken is signaled.

I would like to do something like this:

List<long> result = await Foo().ToList(myCancellationToken);

What is the correct (and simplest) way to accomplish this? The ToList() extension method of IObservable<T> returns an IObservable<List<T>> and does not take a CancellationToken parameter.

like image 407
DeCaf Avatar asked Dec 21 '22 01:12

DeCaf


2 Answers

var list = await Foo().ToList().ToTask(cancellationToken);

This has the advantage of cancelling immediately if the token is cancelled (the other Answer will not cancel until the next time the Foo observable produces a value).

like image 82
Brandon Avatar answered Feb 12 '23 11:02

Brandon


Use TakeWhile to terminate the list.

CancellationToken MyToken = ...
var list = await Foo().TakeWhile(v=>!MyToken.IsCancellationRequested).ToList();

If you are worried about the subscription only cancelling when the next item is provided you can have this extensions method.

public static IObservable<T> 
TakeWhile<T>
    ( this IObservable<T> This
    , CancellationToken t 
    )
{
    var cts = CancellationTokenSource.CreateLinkedTokenSource(t);
    return Observable.Create<T>(( IObserver<T> observer ) => 
       { 
           This.Subscribe(observer, cts.Token);
           return Disposable.Create(() => cts.Cancel());
    });
}

and write

CancellationToken MyToken = ...
var list = await Foo().TakeWhile(MyToken.IsCancellationRequested).ToList();

Using TakeWhile with a cancellation token is more composable than ToTask which just returns the last element.

like image 24
bradgonesurfing Avatar answered Feb 12 '23 09:02

bradgonesurfing