Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Consuming shared IObservable from SignalR

Lets say that I have an IObservable<Something> serverside, stored in a static field or whatever.

Lets also assume that I have an SignalR hub that has a Subscribe method and an signalR client that has a notify function.

public static IObservable<string> Events;
protected void Application_Start()
{
   //dummy observable just to generate events for me..
   Events = Observable
       .Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(50))
       .Select(l => l.ToString());
...snip..
}

and:

public class MyHub1 : Hub
{        
    public void Subscribe()
    {
        Clients.All.notify("start");
        WebApiApplication.Events
            .Subscribe(s => Clients.Caller.notify(s));
    }
}

and:

myHub.client.notify = function (event) {
     console.info(event);
};

What do I need to do to make the observable shared among all clients? That is, I want each connecting client to receive say the last 200 events once they subscribe. and then each client should se the same event ticking away in realtime.

I suppose the Replay method of IObservable should be used somehow. I want this to behave pretty much like a chat where the user gets to see the last x messages + every new event in realtime.

aside from how to actually composing the observable query, what are the best way to store and set up the shared event stream in Asp.NET?

like image 927
Roger Johansson Avatar asked May 30 '14 14:05

Roger Johansson


1 Answers

This is a very common architectural problem - combining a live stream with a "state of the world". What you want to do is leverage SignalR to broadcast the live messages to current subscribers (what it is good at), and have a separate API call for joining clients to get historical messages.

In the clients, you supply logic that subscribes to the live SignalR message stream first and then requests the history of messages that have already occurred (the "state of the world") - typically best pulled back as a simple ordered list.

There is an inherent race condition that can result in receiving a message in the live stream and in the history - so you must take care to de-duplicate or "de-dupe" your message list.

How you persist messages can then be handled as an entirely separate problem.

This decoupling of the history and live stream affords flexibility in how you deal with both and offers opportunities for efficiencies like paging into history rather than grabbing the whole thing, for example.

There are a few questions and answers that talk about ways to leverage Rx to combine the history and live data - you would need to do this in the client-side javascript, and I'm not great on rx js.

Have a look at Merging historical and live stock price data with Rx for some discussion on this problem, and also Are these two Observable Operations Equivalent? - I have some sample code in the latter case which is a purely .NET scenario.

like image 173
James World Avatar answered Nov 09 '22 23:11

James World