I have a class that is going to be responsible for generating events on an frequent but irregular interval, that other classes must consume and operate on. I want to use Reactive Extensions for this task.
The consumer side of this is very straightforward; I have my consumer class implementing IObserver<Payload>
and all seems well. The problem comes on the producer class.
Implementing IObservable<Payload>
directly (that is, putting my own implementation for IDisposable Subscribe(IObserver<Payload> )
is, according to the documentation, not recommended. It suggests instead composing with the Observable.Create()
set of functions. Since my class will run for a long time, I've tried creating an Observable with var myObservable = Observable.Never()
, and then, when I have new Payloads available, calling myObservable.Publish(payloadData)
. When I do this, though, I don't seem to hit the OnNext
implementation in my consumer.
I think, as a work-around, I can create an event in my class, and then create the Observable using the FromEvent
function, but this seems like an overly complicated approach (i.e., it seems weird that the new hotness of Observables 'requires' events to work). Is there a simple approach I'm overlooking here? What's the 'standard' way to create your own Observable sources?
Create a new Subject<Payload>
and call it's OnNext
method to send an event. You can Subscribe
to the subject with your observer.
The use of Subjects is often debated. For a thorough discussion on the use of Subjects (which links to a primer), see here - but in summary, this use case sounds valid (i.e. it probably meets the local + hot criteria).
As an aside, overloads of Subscribe accepting delegates may remove the need for you to provide an implemention of IObserver<T>
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With