Usually when you subscribe to the changes of a value you are also interested in knowing the initial value. I want my IObservable to cache the latest (or initial) value and push that value on subscription.
When using plain events I often end up with code that looks like
x.SomeEvent += SomeEventHandler;
SomeEventHandler(x, EventArgs.Empty);
Using IObservable I was hoping to wrap the event with something that pushes the initial value. If I have multiple subscribers they should receive the newest value upon subscription I have some code that works right if I subscribe right after creating the IObservable but not if the event fires before subscribing:
class Program
{
static void Main()
{
var s = new Source { Value = 1 };
var values = Observable.Return(s.Value).Concat(
Observable.FromEvent(
h => s.ValueChanged += h,
h => s.ValueChanged -= h)
.Select(_ => s.Value));
using (values.Subscribe(Console.WriteLine))
{
s.Value = 2; // prints 1,2 as expected
}
using (values.Subscribe(Console.WriteLine))
{
s.Value = 3; // prints 1,3 - expected 2,3
}
}
}
class Source
{
private int _value;
public int Value
{
get { return _value; }
set
{
if (_value == value)
return;
_value = value;
if (ValueChanged != null)
ValueChanged(this, EventArgs.Empty);
}
}
public event EventHandler ValueChanged;
}
How do I create an IObservable that works as expected?
The main idea here is that if you want to update the value of an Observable you have to do this inside the 'create' function. This will be possible if you declare a function inside this 'create' function. For this implementation, you only need one Service.
Observables Can Emit Data and Notifications Synchronously Observables can also emit data and notifications synchronously. The observable function above produces data synchronously.
Observables are declarative —that is, you define a function for publishing values, but it is not executed until a consumer subscribes to it. The subscribed consumer then receives notifications until the function completes, or until they unsubscribe.
The solution is to subscribe a BehaviorSubject
to the observable, and to subscribe all observers to the BehaviorSubject
. The BehaviorSubject
will remember the last notification and notify new observers of it upon subscription.
Have a look at the Observable.Publish
extension method that has a initialValue
parameter. This creates an IConnectableObservable
that internally uses an BehaviorSubject
.
var s = new Source { Value = 1 };
var values = Observable.FromEvent(h => s.ValueChanged += h,
h => s.ValueChanged -= h)
.Select(e => e.NewValue)
.Publish(s.Value);
using (values.Connect()) // subscribes subject to event
{
using (values.Subscribe(Console.WriteLine)) // subscribes to subject
{
s.Value = 2;
} // unsubscribes from subject
using (values.Subscribe(Console.WriteLine)) // subscribes to subject
{
s.Value = 3;
} // unsubscribes from subject
} // unsubscribes subject from event
(untested)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With