Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Converting an async method to return IObservable<>

I have an async method that is a long-running method that reads a stream and when it finds something fires an event:

public static async void GetStream(int id, CancellationToken token)

It takes a cancellation token because it is create in a new task. Internally it calls await when it reads a stream:

var result = await sr.ReadLineAsync()

Now, I want to convert this to a method that returns an IObservable<> so that I can use this with the reactive extensions. From what I've read, the best way to do this is using Observable.Create, and since RX 2.0 now also supports async I can get it all to work with something like this:

public static IObservable<Message> ObservableStream(int id, CancellationToken token)
{
    return Observable.Create<Message>(
        async (IObserver<Message> observer) =>
            {

The rest of the code inside is the same, but instead of firing events I'm calling observer.OnNext(). But, this feels wrong. For one thing I'm mixing CancellationTokens up in there, and although adding the async keyword made it work, is this actually the best thing to do? I'm calling my ObservableStream like this:

Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m));
like image 816
Matt Roberts Avatar asked Jul 04 '13 08:07

Matt Roberts


2 Answers

You are correct. Once you represent your interface through an IObservable, you should avoid requiring the callers to supply a CancellationToken. That doesn't mean you cannot use them internally. Rx provides several mechanisms to produce CancellationToken instances which are canceled when the observer unsubscribes from your observable.

There are a number of ways to tackle your problem. The simplest requires almost no changes in your code. It uses an overload of Observable.Create which supplies you with a CancellationToken that triggers if the caller unsubscribes:

public static IObservable<Message> ObservableStream(int id)
{
    return Observable.Create<Message>(async (observer, token) =>
    {
         // no exception handling required.  If this method throws,
         // Rx will catch it and call observer.OnError() for us.
         using (var stream = /*...open your stream...*/)
         {
             string msg;
             while ((msg = await stream.ReadLineAsync()) != null)
             {
                 if (token.IsCancellationRequested) { return; }
                 observer.OnNext(msg);
             }
             observer.OnCompleted();
         }
    });
}
like image 75
Brandon Avatar answered Nov 14 '22 17:11

Brandon


You should change GetStream to return a Task, instead of void (returning async void is not good, except when absolutely required, as svick commented). Once you return a Task, you can just call .ToObservable() and you are done.

For example:

public static async Task<int> GetStream(int id, CancellationToken token) { ... }

Then,

GetStream(1, new CancellationToken(false))
   .ToObservable()
   .Subscribe(Console.Write);
like image 35
Richard Anthony Freeman-Hein Avatar answered Nov 14 '22 17:11

Richard Anthony Freeman-Hein