Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it the best to implement ObservableBase in this situation or is there another way?

First of all, I didn't find a good example of custom implementation of the ObservableBase or AnonymousObservable. I have no idea which one I need to implement in my case if any. The situation is this.

I use a third-party library and there is a class let's call it Producer which allows me to set a delegate on it like objProducer.Attach(MyHandler). MyHandler will receive messages from the Producer. I'm trying to create a wrapper around the Producer to make it observable and ideally to be it a distinct type instead of creating just an instance of observable (like Observable.Create).

EDITED: Third-party Producer has the following interface

public delegate void ProducerMessageHandler(Message objMessage);
public class Producer : IDisposable {
   public void Start();
   public void Attach(ProducerMessageHandler fnHandler);
   public void Dispose();
}

as I mentioned I have no control over the source code of it. It is intended to be used like this: create an instance, call Attach and pass a delegate, call Start which basically initiates receiving messages inside the provided delegated when Producer receives them or generates them.

I was thinking about creating public class ProducerObservable : ObservableBase<Message> so that when somebody subscribes to it I would (Rx library would) push messages to the observers. It seems that I need to call Attach somewhere in the constructor of my ProducerObservable, then I need somehow to call OnNext on the observers attached to it. Does it mean that I have to code all this: add a list of observers LinkedList<IObserver<Message>> to the class and then add observers when SubscribeCore abstract method is called on the ProducerObservable? Then apparently I would be able to enumerate the LinkedList<IObserver<Message>> in the MyHandler and call OnNext for each one. All these looks feasible but it doesn't feel exactly right. I would expect .net reactive extensions to be better prepare to situations like this and have at least the implementation of the LinkedList<IObserver<Message>> ready somewhere in base class.

like image 640
alex.49.98 Avatar asked Nov 01 '22 02:11

alex.49.98


1 Answers

In code that uses Rx, "Producer" objects are usually objects that expose IObservable<T> instances via public properties or methods. It is less common that the Producer class itself would implement IObservable<T>, and when it does, it does so by using Rx to do the heavy lifting under the hood. You absolutely never want to implement an IObservable<T> yourself.

Here's an example where the observable is exposed as a property:

public class Producer
{
    public Producer(ThirdPartyLib.Producer p)
    {
        var c = Observable.Create(observer =>
        {
            ProducerMessageHandler h = msg => observer.OnNext(msg);
            p.Attach(h);
            p.Start();

            return Disposable.Empty;
        }).Publish();

        // Connect the observable the first time someone starts
        // observing
        Stream = Observable.Create(observer =>
        {
            var subscription = c.Subscribe(observer);
            if (Interlocked.Exchange(ref _connected, 1) == 0)
            {
                c.Connect();
            }

            return subscription;
        });
    }

    private int _connected;
    public IObservable<Message> Stream { get; private set; }
}

And here is the same example where we actually implement IObservable<T> by delegating to Rx:

public class Producer : IObservable<Message>
{
    public Producer(ThirdPartyLib.Producer p)
    {
        var c = Observable.Create(observer =>
        {
            ProducerMessageHandler h = msg => observer.OnNext(msg);
            p.Attach(h);
            p.Start();

            return Disposable.Empty;
        }).Publish();

        // Connect the observable the first time someone starts
        // observing
        _stream = Observable.Create(observer =>
        {
            var subscription = c.Subscribe(observer);
            if (Interlocked.Exchange(ref _connected, 1) == 0)
            {
                c.Connect();
            }

            return subscription;
        });
    }

    private IObservable<Message> _stream;

    // implement IObservable<T> by delegating to Rx
    public IDisposable Subscribe(IObserver<Message> observer)
    {
        return _stream.Subscribe(observer);
    }
}
like image 78
Brandon Avatar answered Nov 15 '22 02:11

Brandon