I'm trying to write an extension method for System.Net.WebSocket that will turn it into an IObserver using Reactive Extensions (Rx.NET). You can see the code below:
public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer) { // Wrap the web socket in an interface that's a little easier to manage var webSocketMessageStream = new WebSocketMessageStream(webSocket); // Create the output stream to the client return Observer.Create<T>( onNext: async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)), onError: async error => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)), onCompleted: async () => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected") ); }
This code works, but I am concerned about the use of async/await inside of the onNext, onError, and onCompleted lambdas. I know that this returns an async void lambda, which is frowned upon (and sometimes causes issues that I've already run into myself).
I've been reading up on the Rx.NET documentation as well as blog posts across the internet and I cannot seem to find the proper way (if there is one) to use an async/await method in an IObserver. Is there a proper way to do this? If not, then how can I work around this problem?
Subscribe does not take async
methods. So what happens here is you are using a fire-and-forget mechanism from async void
. The problem is that onNext messages will no longer be serialized.
Instead of calling an async
method inside Subscribe, you should wrap it into the pipeline to allow Rx to wait for it.
It's okay to use fire-and-forget on onError
and onCompleted
because these are guaranteed to be the last thing called from the Observable
. Do keep in mind that resources associated with the Observable
can be cleaned up after onError
and onCompleted
returned, before they completed.
I wrote a small extension method that does all this:
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted) { return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat() .Subscribe( e => { }, // empty onError, onCompleted); }
First we convert the async onNext
method into an Observable
, bridging the two async worlds. This means async/await inside onNext
will be respected. Next we wrap the method into Defer
so it wont start right when it is created. Instead Concat
will call the next defered observable only when the previous one finished.
Ps. I have hope the next release of Rx.Net will have async
support in subscribe.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With