Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using disposed observer does not re-subscribe to the source

I'm trying to reuse an Observer for a Single and Observable stream by creating only a single instance of DisposableSingleObserver/DisposableObserver and passing them through it through the subscribeWith() method on the stream like below:

public class SomeClass {
    private DisposableSingleObserver<Object> observer;

    public SomeClass() {
        observer = new DisposableSingleObserver<Object>() {
            @Override
            public void onSuccess(Object object) {
                ...
            }

            @Override
            public void onError(Throwable throwable) {
                ...
            }
        };
    }

    public void doSomeStuff() {
        singleStream.subscribeOn(...)
            .observeOn(...)
            .subscribeWith(observer);
    }
}

The above code resulted in ProtocolViolationException when I tried to subscribe with the single observer instance multiple times with message:

io.reactivex.exceptions.ProtocolViolationException: It is not allowed to subscribe with a(n) com.package.name.SomeClass$1 multiple times. Please create a fresh instance of com.package.name.SomeClass$1 and subscribe that to the target source instead.

So I modified the code a bit like below:

public class SomeClass {
    ...

    public void doSomeStuff() {
        if (observer != null) {
            observer.dispose();
        }

        singleStream.subscribeOn(...)
            .observerOn(...)
            .subscribeWith(observer);
    }
}

When I executed the above code, the ProtocolViolationException was no longer thrown and I was able to get event from the single stream successfully. But when I tried to call doSomeStuff() method couple of time, it completed successfully on first call but did not emit any event on second call. I was able to confirm that the subscription happened through doOnSubscribe() both times but the code emitting event for single stream was never executed on second call. So I have three questions:

  1. Why was the ProtocolViolationException thrown in the first place?
  2. Why was I able to get the event on first subscription but not on the second subscription even though we subscribed to the source in both cases?
  3. How can I reuse observer by using only a single instance?
like image 200
Harry Avatar asked Oct 04 '17 22:10

Harry


1 Answers

1) You are not allowed to reuse the DisposableSingleObserver and its cousins as they are stateful and one time use only. This is due to the protocol mandated by Single: exactly one onSubscribe followed by at most one onSuccess or onError. A second subscription violates this protocol.

2) Disposing the DisposableSingleObserver puts it in a disposed state and any subsequent subscription attempt will be considered as immediately disposed.

3) You shouldn't. Why do you want to subscribe multiple times and why can't you use a new DisposableSingleObserver each time. You should rethink your dataflow.

like image 183
akarnokd Avatar answered Oct 09 '22 20:10

akarnokd