Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observing incoming websocket messages with Reactive Extensions?

I want to use linq to process events received via a websocket connection. This is what I have so far:

    private static void Main()
    {
        string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
        using (WebSocket ws = new WebSocket(WsEndpoint))
        {
            ws.OnMessage += Ws_OnMessage;

            ws.Connect();
            Console.ReadKey();
            ws.Close();
        }
    }

    private static void Ws_OnMessage(object sender, MessageEventArgs e)
    {
        Console.WriteLine(e.Data);
    }

The first think that stumps me is how to turn ws.OnMessage into some sort of event stream. I cannot find any examples online for observing an external event source with reactive extensions. I intend to parse the messages into json objects, then filter and aggregate them.

Could someone provide an example of creating an observable from the websocket messages, and subscribing to it?


Edit: Final Working Code

The only difference from the chosen answer is that I initialized the websocket before passing it into Observable.Using

//-------------------------------------------------------
// Create websocket connection
//-------------------------------------------------------
const string wsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
WebSocket socket = new WebSocket(wsEndpoint);


//-------------------------------------------------------
// Create an observable by wrapping ws.OnMessage
//-------------------------------------------------------
var globalEventStream = Observable
    .Using(
        () => socket,
        ws =>
            Observable
                .FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
                    handler => ws.OnMessage += handler,
                    handler => ws.OnMessage -= handler));
//---------------------------------------------------------
// Subscribe to globalEventStream
//---------------------------------------------------------

IDisposable subscription = globalEventStream.Subscribe(ep =>
{
    Console.WriteLine("Event Recieved");
    Console.WriteLine(ep.EventArgs.Data);
});

//----------------------------------------------------------
// Send message over websocket
//----------------------------------------------------------
socket.Connect();
socket.Send("test message");
// When finished, close the connection.
socket.Close();
like image 857
mooglinux Avatar asked Aug 24 '16 00:08

mooglinux


1 Answers

You should set up your observable like this:

    var observable =
        Observable
            .Using(
                () => new WebSocket(WsEndpoint),
                ws =>
                    Observable
                        .FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
                            handler => ws.OnMessage += handler,
                            handler => ws.OnMessage -= handler));

This will correctly create the socket and then observe the event when the observable is subscribed to. When the subscription is disposed it will correctly detach from the event and dispose of the socket.


The type of observable will be IObservable<EventPattern<MessageEventArgs>>. You consume this observable in this way:

IDisposable subscription = observable.Subscribe(ep =>
{
    Console.WriteLine(ep.EventArgs.Data);
});

Thanks for posted the NuGet reference.

Here's the working code:

const string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";

Console.WriteLine("Defining Observable:");

IObservable<EventPattern<WebSocketSharp.MessageEventArgs>> observable =
    Observable
        .Using(
            () =>
            {
                var ws = new WebSocketSharp.WebSocket(WsEndpoint);
                ws.Connect();
                return ws;
            },
            ws =>
                Observable
                    .FromEventPattern<EventHandler<WebSocketSharp.MessageEventArgs>, WebSocketSharp.MessageEventArgs>(
                        handler => ws.OnMessage += handler,
                        handler => ws.OnMessage -= handler));

Console.WriteLine("Subscribing to Observable:");

IDisposable subscription = observable.Subscribe(ep =>
{
    Console.WriteLine("Event Recieved");
    Console.WriteLine(ep.EventArgs.Data);
});

Console.WriteLine("Writing to Source:");

using (var source = new WebSocketSharp.WebSocket(WsEndpoint))
{
    source.Connect();
    source.Send("test");
}
like image 108
Enigmativity Avatar answered Oct 19 '22 06:10

Enigmativity