Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

IObservable - How to Send/Publish/Push new values to collection

I want to expose an IObservable from my service layer.

For simplicity lets say that internally the service layer is getting Message from a remote server (via a socket) and that the socket library requires an object of IMessageReponse that has a MessageReceived method to be passed to it.

Internally the service layer creates a MessageResponse object and get notified by a Action callback when a message arrives.

Given this design I need to be able to push new messages to the IObservable but in any of the examples I've seen, Observable.XYZ doesn't seem to support a simple Send/Publish/Push method...

How do I wireup my Observable.XYZ in this scenario???

I want something like this... note I know this is a very basic implementation of IObservable, but I wouldn't have thought I would need to write this code myself... I would have thought that something would have been there for me out of the box.

public class PushObservable<T> : IObservable<T>
{
    private IList<IObserver<T>> _listeners = new List<IObserver<T>>();

    public void Send(T value)
    {
        foreach (var listener in _listeners) 
            listener.OnNext(value); 
    }

    public IDisposable Subscribe(IObserver<T> observer)
    { 
        _listeners.Add(observer);
    }
}
like image 686
vdh_ant Avatar asked Mar 01 '11 22:03

vdh_ant


1 Answers

You have rewritten an object that already exists! Your "PushObservable" is actually Subject<T>, and it's one of the fundamental objects in Rx.

If you really want to think about this problem in an Rx way, you'd probably start with an IObservable<byte[]> that comes from the socket, then you would Select this into an IObservable<IMessageResponse>, since at the end of the day, the event you're responding to is bytes coming off the wire.

like image 69
Ana Betts Avatar answered Oct 18 '22 10:10

Ana Betts