Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is unsubscribe thread safe in RxJava?

Suppose I have the following RxJava code (which accesses a DB, but the exact use case is irrelevant):

public Observable<List<DbPlaceDto>> getPlaceByStringId(final List<String> stringIds) {
    return Observable.create(new Observable.OnSubscribe<List<DbPlaceDto>>() {
        @Override
        public void call(Subscriber<? super List<DbPlaceDto>> subscriber) {
            try {
                Cursor c = getPlacseDb(stringIds);

                List<DbPlaceDto> dbPlaceDtoList = new ArrayList<>();
                while (c.moveToNext()) {
                    dbPlaceDtoList.add(getDbPlaceDto(c));
                }
                c.close();

                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(dbPlaceDtoList);
                    subscriber.onCompleted();
                }
            } catch (Exception e) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onError(e);
                }
            }
        }
    });
}

Given this code, I have the following questions:

  1. If someone unsubscribes from the observable returned from this method (after a previous subscription), is that operation thread-safe? So are my 'isUnsubscribed()' checks correct in this sense, regardless of scheduling?

  2. Is there a cleaner way with less boilerplate code to check for unsubscribed states than what I'm using here? I couldn't find anything in the framework. I thought SafeSubscriber solves the issue of not forwarding events when the subscriber is unsubscribed, but apparently it does not.

like image 517
Zsombor Erdődy-Nagy Avatar asked Sep 22 '15 17:09

Zsombor Erdődy-Nagy


People also ask

Is RxJava thread safe?

TL;TR: most of RxJava Operators and Subjects are NOT thread safe.

What is RxJava subscriber?

In RxJava 2 org. reactivestreams. Subscriber is an interface complying to Reactive Streams specification. The main difference from Observable is that new Subscriber supports backpressure. Observer is subscribed to Observable , and Subscriber is subscribed to Flowable (implements org.


1 Answers

is that operation thread-safe?

Yes. You are receiving an rx.Subscriber which (eventually) checks against a volatile boolean that is set to true when the subscriber's subscription is unsubscribed.

cleaner way with less boilerplate code to check for unsubscribed states

The SyncOnSubscribe and the AsyncOnSubscribe (available as an @Experimental api as of release 1.0.15) was created for this use case. They function as a safe alternative to calling Observable.create. Here is a (contrived) example of the synchronous case.

public static class FooState {
    public Integer next() {
        return 1;
    }
    public void shutdown() {

    }
    public FooState nextState() {
        return new FooState();
    }
}
public static void main(String[] args) {
    OnSubscribe<Integer> sos = SyncOnSubscribe.createStateful(FooState::new, 
            (state, o) -> {
                o.onNext(state.next());
                return state.nextState();
            }, 
            state -> state.shutdown() );
    Observable<Integer> obs = Observable.create(sos); 
}

Note that the SyncOnSubscribe next function is not allowed to call observer.onNext more than once per iteration nor can it call into that observer concurrently. Here are a couple of links to the SyncOnSubscribe implementation and tests on the head of the 1.x branch. It's primary usage is to simplify writing observables that iterate or parsing over data synchronously and onNext downstream but doing so in a framework that supports back-pressure and checks if unsubscribed. Essentially you would create a next function which would get invoked every time the downstream operators need a new data element onNexted. Your next function can call onNext either 0 or 1 time.

The AsyncOnSubscribe is designed to play nicely with back pressure for observable sources that operate asynchronously (such as off-box calls). The arguments to your next function include the request count and your provided observable should provide an observable that fulfills data up to that requested amount. An example of this behavior would be paginated queries from an external datasource.

Previously it was a safe practice to transform your OnSubscribe to an Iterable and use Observable.from(Iterable). This implementation gets an iterator and checks subscriber.isUnsubscribed() for you.

like image 112
Aaron Avatar answered Oct 12 '22 03:10

Aaron