Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I push an entity onto an Rx Observable?

I have a class that is going to be responsible for generating events on an frequent but irregular interval, that other classes must consume and operate on. I want to use Reactive Extensions for this task.

The consumer side of this is very straightforward; I have my consumer class implementing IObserver<Payload> and all seems well. The problem comes on the producer class.

Implementing IObservable<Payload> directly (that is, putting my own implementation for IDisposable Subscribe(IObserver<Payload> ) is, according to the documentation, not recommended. It suggests instead composing with the Observable.Create() set of functions. Since my class will run for a long time, I've tried creating an Observable with var myObservable = Observable.Never(), and then, when I have new Payloads available, calling myObservable.Publish(payloadData). When I do this, though, I don't seem to hit the OnNext implementation in my consumer.

I think, as a work-around, I can create an event in my class, and then create the Observable using the FromEvent function, but this seems like an overly complicated approach (i.e., it seems weird that the new hotness of Observables 'requires' events to work). Is there a simple approach I'm overlooking here? What's the 'standard' way to create your own Observable sources?

like image 809
GWLlosa Avatar asked Mar 25 '15 17:03

GWLlosa


1 Answers

Create a new Subject<Payload> and call it's OnNext method to send an event. You can Subscribe to the subject with your observer.

The use of Subjects is often debated. For a thorough discussion on the use of Subjects (which links to a primer), see here - but in summary, this use case sounds valid (i.e. it probably meets the local + hot criteria).

As an aside, overloads of Subscribe accepting delegates may remove the need for you to provide an implemention of IObserver<T>.

like image 83
James World Avatar answered Oct 05 '22 06:10

James World