I want to expose an IObservable from my service layer.
For simplicity lets say that internally the service layer is getting Message from a remote server (via a socket) and that the socket library requires an object of IMessageReponse that has a MessageReceived method to be passed to it.
Internally the service layer creates a MessageResponse object and get notified by a Action callback when a message arrives.
Given this design I need to be able to push new messages to the IObservable but in any of the examples I've seen, Observable.XYZ doesn't seem to support a simple Send/Publish/Push method...
How do I wireup my Observable.XYZ in this scenario???
I want something like this... note I know this is a very basic implementation of IObservable, but I wouldn't have thought I would need to write this code myself... I would have thought that something would have been there for me out of the box.
public class PushObservable<T> : IObservable<T>
{
private IList<IObserver<T>> _listeners = new List<IObserver<T>>();
public void Send(T value)
{
foreach (var listener in _listeners)
listener.OnNext(value);
}
public IDisposable Subscribe(IObserver<T> observer)
{
_listeners.Add(observer);
}
}
You have rewritten an object that already exists! Your "PushObservable" is actually Subject<T>
, and it's one of the fundamental objects in Rx.
If you really want to think about this problem in an Rx way, you'd probably start with an IObservable<byte[]>
that comes from the socket, then you would Select this into an IObservable<IMessageResponse>
, since at the end of the day, the event you're responding to is bytes coming off the wire.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With