First of all, I didn't find a good example of custom implementation of the ObservableBase or AnonymousObservable. I have no idea which one I need to implement in my case if any. The situation is this.
I use a third-party library and there is a class let's call it Producer which allows me to set a delegate on it like objProducer.Attach(MyHandler). MyHandler will receive messages from the Producer. I'm trying to create a wrapper around the Producer to make it observable and ideally to be it a distinct type instead of creating just an instance of observable (like Observable.Create).
EDITED: Third-party Producer has the following interface
public delegate void ProducerMessageHandler(Message objMessage);
public class Producer : IDisposable {
public void Start();
public void Attach(ProducerMessageHandler fnHandler);
public void Dispose();
}
as I mentioned I have no control over the source code of it. It is intended to be used like this: create an instance, call Attach and pass a delegate, call Start which basically initiates receiving messages inside the provided delegated when Producer receives them or generates them.
I was thinking about creating public class ProducerObservable : ObservableBase<Message>
so that when somebody subscribes to it I would (Rx library would) push messages to the observers. It seems that I need to call Attach somewhere in the constructor of my ProducerObservable, then I need somehow to call OnNext on the observers attached to it. Does it mean that I have to code all this: add a list of observers LinkedList<IObserver<Message>>
to the class and then add observers when SubscribeCore abstract method is called on the ProducerObservable? Then apparently I would be able to enumerate the LinkedList<IObserver<Message>>
in the MyHandler and call OnNext for each one. All these looks feasible but it doesn't feel exactly right. I would expect .net reactive extensions to be better prepare to situations like this and have at least the implementation of the LinkedList<IObserver<Message>>
ready somewhere in base class.
In code that uses Rx, "Producer" objects are usually objects that expose IObservable<T>
instances via public properties or methods. It is less common that the Producer
class itself would implement IObservable<T>
, and when it does, it does so by using Rx
to do the heavy lifting under the hood. You absolutely never want to implement an IObservable<T>
yourself.
Here's an example where the observable is exposed as a property:
public class Producer
{
public Producer(ThirdPartyLib.Producer p)
{
var c = Observable.Create(observer =>
{
ProducerMessageHandler h = msg => observer.OnNext(msg);
p.Attach(h);
p.Start();
return Disposable.Empty;
}).Publish();
// Connect the observable the first time someone starts
// observing
Stream = Observable.Create(observer =>
{
var subscription = c.Subscribe(observer);
if (Interlocked.Exchange(ref _connected, 1) == 0)
{
c.Connect();
}
return subscription;
});
}
private int _connected;
public IObservable<Message> Stream { get; private set; }
}
And here is the same example where we actually implement IObservable<T>
by delegating to Rx:
public class Producer : IObservable<Message>
{
public Producer(ThirdPartyLib.Producer p)
{
var c = Observable.Create(observer =>
{
ProducerMessageHandler h = msg => observer.OnNext(msg);
p.Attach(h);
p.Start();
return Disposable.Empty;
}).Publish();
// Connect the observable the first time someone starts
// observing
_stream = Observable.Create(observer =>
{
var subscription = c.Subscribe(observer);
if (Interlocked.Exchange(ref _connected, 1) == 0)
{
c.Connect();
}
return subscription;
});
}
private IObservable<Message> _stream;
// implement IObservable<T> by delegating to Rx
public IDisposable Subscribe(IObserver<Message> observer)
{
return _stream.Subscribe(observer);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With