I have an API available to clients that can be simplified to this:
public class API {
public void sendEvent(Event e);
}
Event
instances enter my system whenever a client calls the API (technically over Binder into a Service
derivative) which are then processed, filtered and dispatched to other internal components. I don't care about past events, just those available from the time a subscriber subscribes. It seems like a natural fit for the Rx paradigm which I'm just getting my feet wet with.
I need an Observable that is created once, allows multiple subscribers, and can be fed instances of Event
that are then sent through the reactive pipeline to observers. A Subject
seems appropriate for what I'm looking to do (in particular, this answer to this question resonated with me).
What do other RxJava users recommend?
For example, following on my short comment:
public class API implements OnSubscribe<Event> {
private List<Subscriber<Event>> subscribers = new ArrayList<>();
public void sendEvent(Event event) {
// Do whatever you need with the event
for (Subscriber<Event> sub : subscribers) {
sub.onNext(event);
}
}
public void call(Subscriber<Event> sub) {
subscribers.add(sub);
}
}
Then you probably have an instance somewhere: API api = ...
Your Observable is obtained like so: Observable.create(api);
You can then do any normal thing you would do with an Observable.
The filtering of the unsubscribed Subscriber
s is left as an exercise to the reader.
A little more research shows that PublishSubject
should help:
public class API {
private PublishSubject<Event> subject = PublishSubject.create();
public void sendEvent(Event event) {
// Do whatever you need with the event
// Then publish it
subject.onNext(event);
}
public Observable<Event> getObservable() {
return subject.asObservable();
}
}
This way, you can subscribe to this Observable, and every time an event is sent to API
, it is published to all subscribers.
Use like this:
API api = ...;
api.getObservable().subscribe(event -> doStuffWithEvent(event));
api.getObservable().subscribe(event -> doOtherStuffWithEvent(event));
Try observable.share()
which under the covers calls .publish().refCount()
. It will use only one underlying subscription and give you the multiple subscription behaviour you specified.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With