In my app, I have a method that gets invoked every time an update happens on some queue on a server. The app is initialized to behave this way.
Now, each time the method is invoked with the latest data, I want to treat it like a part of a stream of event and hence make this a part of an Observable that never ends with subscriber.
The challenge I am facing is: how do I create an observable on a method that is being invoked? Below is my sample code.
//This method is invoked every time an update happens on the server
public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
Observable.Create<MyObject3>(observer =>
{
var object3 = new MyObject3(object1, object2);
observer.OnNext(object3 );
return Disposable.Empty;
})
.Subscribe(x => WriteLine($"Message acknowledged"));
}
But this creates an observable for each time the method is invoked and not something I want and neither it looks like the right approach. I also read that using "Subject" or "AsyncSubject" is not the right way to solve the issues.
The rule on not using Subject
is more a guideline that hasn't been very well expressed.
In general, if you're using subjects within an observable pipeline then you're likely to be doing something wrong - and that should be avoided.
If you're using a Subject
as a source for an observable and you properly encapsulate the Subject
and you obfuscate it, then you're fine. So this usually means using a private
field that only your code can access (so no-one can call .OnCompleted()
on it and calling .AsObservable()
so that no-one can cast your observable back to the underlying Subject
.
In your case you're directly subscribing so no .AsObservable()
is needed, but I suspect that that's just demo code. In the real-world make sure you obfuscate.
Here's what your code should look like:
private Subject<MyObject3> _subject = new Subject<MyObject3>();
private void SetUpObservable()
{
_subject = new Subject<MyObject3>();
_subject.Subscribe(x => Console.WriteLine($"Message acknowledged"));
}
public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
_subject.OnNext(new MyObject3(object1, object2));
}
Now, if you still want to avoid the Subject
then you can do this as an alternative:
private Action<MyObject3> _delegate;
private void SetUpObservable()
{
Observable
.FromEvent<MyObject3>(h => _delegate += h, h => _delegate -= h)
.Subscribe(x => Console.WriteLine($"Message acknowledged"));
}
public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
_delegate?.Invoke(new MyObject3(object1, object2));
}
In my opinion the Subject
gives you better control and it is easier to set up.
In any case, you probably should retain the subscription IDisposable
so that you can properly clean up.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With