Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement an IObserver with async/await OnNext/OnError/OnCompleted methods?

Tags:

I'm trying to write an extension method for System.Net.WebSocket that will turn it into an IObserver using Reactive Extensions (Rx.NET). You can see the code below:

public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer) {     // Wrap the web socket in an interface that's a little easier to manage     var webSocketMessageStream = new WebSocketMessageStream(webSocket);      // Create the output stream to the client     return Observer.Create<T>(         onNext:      async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)),         onError:     async error   => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)),         onCompleted: async ()      => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected")     ); } 

This code works, but I am concerned about the use of async/await inside of the onNext, onError, and onCompleted lambdas. I know that this returns an async void lambda, which is frowned upon (and sometimes causes issues that I've already run into myself).

I've been reading up on the Rx.NET documentation as well as blog posts across the internet and I cannot seem to find the proper way (if there is one) to use an async/await method in an IObserver. Is there a proper way to do this? If not, then how can I work around this problem?

like image 932
Kevin Craft Avatar asked Feb 03 '16 17:02

Kevin Craft


1 Answers

Subscribe does not take async methods. So what happens here is you are using a fire-and-forget mechanism from async void. The problem is that onNext messages will no longer be serialized.

Instead of calling an async method inside Subscribe, you should wrap it into the pipeline to allow Rx to wait for it.

It's okay to use fire-and-forget on onError and onCompleted because these are guaranteed to be the last thing called from the Observable. Do keep in mind that resources associated with the Observable can be cleaned up after onError and onCompleted returned, before they completed.

I wrote a small extension method that does all this:

    public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)     {         return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat()             .Subscribe(             e => { }, // empty             onError,             onCompleted);     } 

First we convert the async onNext method into an Observable, bridging the two async worlds. This means async/await inside onNext will be respected. Next we wrap the method into Defer so it wont start right when it is created. Instead Concat will call the next defered observable only when the previous one finished.

Ps. I have hope the next release of Rx.Net will have async support in subscribe.

like image 112
Dorus Avatar answered Sep 29 '22 12:09

Dorus