Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I get an IObservable to push the newest value upon subscription

Usually when you subscribe to the changes of a value you are also interested in knowing the initial value. I want my IObservable to cache the latest (or initial) value and push that value on subscription.

When using plain events I often end up with code that looks like

x.SomeEvent += SomeEventHandler;
SomeEventHandler(x, EventArgs.Empty);

Using IObservable I was hoping to wrap the event with something that pushes the initial value. If I have multiple subscribers they should receive the newest value upon subscription I have some code that works right if I subscribe right after creating the IObservable but not if the event fires before subscribing:

class Program
{
    static void Main()
    {
        var s = new Source { Value = 1 };
        var values = Observable.Return(s.Value).Concat(
            Observable.FromEvent(
                h => s.ValueChanged += h,
                h => s.ValueChanged -= h)
            .Select(_ => s.Value));
        using (values.Subscribe(Console.WriteLine))
        {
            s.Value = 2; // prints 1,2 as expected
        }
        using (values.Subscribe(Console.WriteLine))
        {
            s.Value = 3; // prints 1,3 - expected 2,3
        }
    }
}

class Source
{
    private int _value;
    public int Value
    {
        get { return _value; }
        set
        {
            if (_value == value)
                return;
            _value = value;
            if (ValueChanged != null)
                ValueChanged(this, EventArgs.Empty);
        }
    }

    public event EventHandler ValueChanged;
}

How do I create an IObservable that works as expected?

like image 974
Jesper Larsen-Ledet Avatar asked Nov 13 '10 14:11

Jesper Larsen-Ledet


People also ask

How do you change the value of an Observable?

The main idea here is that if you want to update the value of an Observable you have to do this inside the 'create' function. This will be possible if you declare a function inside this 'create' function. For this implementation, you only need one Service.

Can we make Observable synchronous?

Observables Can Emit Data and Notifications Synchronously Observables can also emit data and notifications synchronously. The observable function above produces data synchronously.

What happens when you subscribe to an Observable?

Observables are declarative —that is, you define a function for publishing values, but it is not executed until a consumer subscribes to it. The subscribed consumer then receives notifications until the function completes, or until they unsubscribe.


1 Answers

The solution is to subscribe a BehaviorSubject to the observable, and to subscribe all observers to the BehaviorSubject. The BehaviorSubject will remember the last notification and notify new observers of it upon subscription.

Have a look at the Observable.Publish extension method that has a initialValue parameter. This creates an IConnectableObservable that internally uses an BehaviorSubject.

var s = new Source { Value = 1 };

var values = Observable.FromEvent(h => s.ValueChanged += h,
                                  h => s.ValueChanged -= h)
                       .Select(e => e.NewValue)
                       .Publish(s.Value);

using (values.Connect())                         // subscribes subject to event
{
    using (values.Subscribe(Console.WriteLine))  // subscribes to subject
    {
        s.Value = 2;
    }                                            // unsubscribes from subject

    using (values.Subscribe(Console.WriteLine))  // subscribes to subject
    {
        s.Value = 3;
    }                                            // unsubscribes from subject

}                                            // unsubscribes subject from event

(untested)

like image 190
dtb Avatar answered Oct 11 '22 14:10

dtb