Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In RxJava, how do I start a potentially infinite stream of events generated from an API?

I have an API available to clients that can be simplified to this:

public class API {
  public void sendEvent(Event e);
}

Event instances enter my system whenever a client calls the API (technically over Binder into a Service derivative) which are then processed, filtered and dispatched to other internal components. I don't care about past events, just those available from the time a subscriber subscribes. It seems like a natural fit for the Rx paradigm which I'm just getting my feet wet with.

I need an Observable that is created once, allows multiple subscribers, and can be fed instances of Event that are then sent through the reactive pipeline to observers. A Subject seems appropriate for what I'm looking to do (in particular, this answer to this question resonated with me).

What do other RxJava users recommend?

like image 298
scorpiodawg Avatar asked Jul 14 '15 19:07

scorpiodawg


2 Answers

For example, following on my short comment:

public class API implements OnSubscribe<Event> {
    private List<Subscriber<Event>> subscribers = new ArrayList<>();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        for (Subscriber<Event> sub : subscribers) {
            sub.onNext(event);
        }
    }
    public void call(Subscriber<Event> sub) {
        subscribers.add(sub);
    }
}

Then you probably have an instance somewhere: API api = ...

Your Observable is obtained like so: Observable.create(api); You can then do any normal thing you would do with an Observable.

The filtering of the unsubscribed Subscribers is left as an exercise to the reader.

Edit

A little more research shows that PublishSubject should help:

public class API {
    private PublishSubject<Event> subject = PublishSubject.create();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        // Then publish it
        subject.onNext(event);
    }
    public Observable<Event> getObservable() {
        return subject.asObservable();
    }
}

This way, you can subscribe to this Observable, and every time an event is sent to API, it is published to all subscribers.

Use like this:

API api = ...;
api.getObservable().subscribe(event -> doStuffWithEvent(event));
api.getObservable().subscribe(event -> doOtherStuffWithEvent(event));
like image 66
njzk2 Avatar answered Sep 16 '22 22:09

njzk2


Try observable.share() which under the covers calls .publish().refCount(). It will use only one underlying subscription and give you the multiple subscription behaviour you specified.

like image 33
Dave Moten Avatar answered Sep 17 '22 22:09

Dave Moten