Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava, one observable multiple subscribers: publish().autoConnect()

Tags:

I'm playing around with rxJava/rxAndroid and there's something very basic that doesn't behave as I'd expect. I have this one observable and two subscribers:

Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));

Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));

And this is the output:

D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3

Now, I know I could avoid repeating the count by using publish().autoConnect() but I'm trying to understand this default behaviour first. Each time someone subscribe to the observable it starts emitting the number sequence. I get that. So, when when Subscriber 1 connects it starts emitting items. Subscriber 2 connects right away, why isn't it getting the values as well?

This is how I understand it, From the perspective of the observable:

  1. Someone subscribed to me, I should start emitting items
    [SUBSCRIBERS: 1][ITEMS TO EMIT: 1,2,3]

  2. Emit item '1' to subscribers
    [SUBSCRIBERS: 1][ITEMS TO EMIT: 2,3]

  3. Someone else subscribed to me, I'll emit 1,2,3 once again when I'm done
    [SUBSCRIBERS: 1 & 2][ITEMS TO EMIT: 2,3,1,2,3]

  4. Emit item '2' to subscribers
    [SUBSCRIBERS: 1 & 2][ITEMS TO EMIT: 3,1,2,3]

  5. Emit item '3' to subscribers
    [SUBSCRIBERS: 1 & 2][ITEMS TO EMIT: 1,2,3]

  6. Emit item '1' to subscribers
    [SUBSCRIBERS: 1 & 2][ITEMS TO EMIT: 2,3]

  7. ...

But this is not how it works. It's like they are two separate observables in one. this confuses me, why don't they give the items to all subscribers?

Bonus:

How is that publish().autoConnect() fixes the problem? Let's break it down. publish() gives me a connectable observable. a connectable observable is just like a regular observable but you can tell it when to connect. Then I go ahead tell it to connect right away by calling autoConnect()

By doing so... don't I get the same thing I started with? A plain regular observable. The operators appear to cancel each other.

I could just shut up and use publish().autoconnect(). But I'd like to understand more about how observables work.

Thanks!

like image 713
frankelot Avatar asked Jan 28 '17 22:01

frankelot


People also ask

What is RxJava subscriber?

In RxJava 2 org. reactivestreams. Subscriber is an interface complying to Reactive Streams specification. The main difference from Observable is that new Subscriber supports backpressure. Observer is subscribed to Observable , and Subscriber is subscribed to Flowable (implements org.

What is Connectableobservable?

A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can wait for all intended observers to subscribe to the Observable before the Observable begins emitting items.

What is doOnSubscribe?

The doOnSubscribe() fires a specific Consumer<Disposable> the moment subscription occurs at that point in the Observable chain. It provides access to the Disposable in case you want to call dispose() in that action.


2 Answers

This is because in fact those are two separate observables. They are "spawned" when you invoke subscribe(). Therefore the steps you have provided are incorrect in sense that step 3 & 4 are just 1 & 2 but on a different observable.

But you see them as 1 1 1 2 2 2 because of thread on which the logging happens. If you were to remove the observeOn() part then you would see emissions in a interwoven manner. To see this run code below:

@Test
public void test() throws InterruptedException {
    final Scheduler single = Schedulers.single();
    final long l = System.nanoTime();
    Observable<Long> dataStream =
            Observable.just(1, 2, 3)
                    .map(i -> System.nanoTime())
                    .subscribeOn(Schedulers.computation());
                    //.observeOn(single);

    dataStream.subscribe(i -> System.out.println("1  " + Thread.currentThread().getName() + " " + (i - l)));
    dataStream.subscribe(i -> System.out.println("2  " + Thread.currentThread().getName() + " " + (i - l)));

    Thread.sleep(1000);
}

Output, at least in my run was(notice thread names):

1  RxComputationThreadPool-1 135376988
2  RxComputationThreadPool-2 135376988
1  RxComputationThreadPool-1 135486815
2  RxComputationThreadPool-2 135537383
1  RxComputationThreadPool-1 135560691
2  RxComputationThreadPool-2 135617580

and if you apply the observeOn() it becomes:

1  RxSingleScheduler-1 186656395
1  RxSingleScheduler-1 187919407
1  RxSingleScheduler-1 187923753
2  RxSingleScheduler-1 186656790
2  RxSingleScheduler-1 187860148
2  RxSingleScheduler-1 187864889

As you have correctly pointed out, to get what you want you need the publish().refcount() or simply share()(it is an alias) operator.

This is because the publish() creates a ConnectableObservable which does not start emitting items until told to do so via the connect() method. in which case if you do this:

@Test
public void test() throws InterruptedException {
    final Scheduler single = Schedulers.single();
    final long l = System.nanoTime();
    ConnectableObservable<Long> dataStream =
            Observable.just(1, 2, 3)
                    .map(i -> System.nanoTime())
                    .subscribeOn(Schedulers.computation())
                    .observeOn(single)
                    .publish();

    dataStream.subscribe(i -> System.out.println("1  " + (i - l)));
    dataStream.subscribe(i -> System.out.println("2  " + (i - l)));

    Thread.sleep(1000);
    dataStream.connect();
    Thread.sleep(1000);

}

You will notice that for the first second(the first Thread.sleep() invocation) nothing happens and just after the dataStream.connect() is called the emissions happen.

refCount() takes in a ConnectableObservable and hides from subscribers the need to call connect() by counting how many subscribers are currently subscribed. What it does is upon first subscription it calls connect() and after last unsubscription is unsubscribes from the original observable.

As to the mutual cancellation of the publish().autoConnect(), afterwards you do get an observable but it has one special property, say that the original observable does an API call over the Internet(lasting 10 second), when you use it without share() you will end up with as many parallel queries to the server as there were subscriptions over those 10 seconds. On the other hand with share() you will have only one call.

You will not see any upside of it, if an observable that is shared completes its work very fast (like just(1,2,3)).

autoConnect()/refCount() gives you an intermediate observable to which you subscribe to instead of the original observable.

If you are interested dive into this book: Reactive Programming with RxJava

like image 179
MatBos Avatar answered Oct 07 '22 05:10

MatBos


Regular (cold) observable

At the heart of Observable is subscribe function. Every time new observer subscribes it is passed to this function as a parameter. What this function does, it feeds data into that single observer. It does this by calling observer.onNext method. It might do this immediately (like just does), or via some scheduler (eg. interval), or from background thread or callback (eg. by launching some async task).

I highlighted word single above because that is the only observer this function knows about when it is invoked. In case you subscribe to such an observable multiple times, its subscribe function is called for every subscriber.

Data source like this is called cold observable.

Schedulers

Applying subscribeOn operator adds intermediate step between your subscribe call and original observable's subscribe function. You are no longer calling it directly, but schedule your call via specified scheduler.

observeOn adds similar intermediate step to all onNext invocations of your observer.

In your example subscribe function is called twice, i.e. data series get generated twice. The calls are scheduled via multi-threaded io scheduler, so these invocations happen not on main thread but on two other threads, almost simultaneously. Both threads start invoking onNext methods of two subscribers. Remember that each thread knows about its own subscriber only. onNext calls get scheduled by mainThread scheduler, which is single threaded, i.e. they cannot happen simultaneously but need to be queued somehow. Strictly speaking there can be no guarantee about the ordering of these calls. It depends on various factors and is implementation specific. Try replacing just with interval (this will introduce delay between messages) and you'll see that messages will arrive in different order.

Hot observable

publish operator makes your observable hot, aka connectable. It adds intermediate steps to both subscribe function - this is called only once, and to onNext methods - these get propagated to all subscribed observables. In other words, it allows multiple subscribers to share single subscription.

To be precise, subscribe function is called when you invoke connect method. There are two operators that invoke connect automatically for you:

  • autoConnect invokes connect method when first subscriber comes in. It never disconnects though.
  • refCount invokes connect when first subscriber comes in, and automatically disconnects when last subscriber unsubscribes. It will re-connect (call subscribe function again) when new subscribers come in.

publish().refCount() is popular combination, so it has got shortcut: share().

For your education try the following code with and without share:

Observable<Long> dataStream = Observable.interval(100, TimeUnit.MILLISECONDS)
        .take(3)
        .share();
System.out.println("subscribing A");
dataStream.subscribe(v -> System.out.println("A got " + v));
TimeUnit.MILLISECONDS.sleep(150);
System.out.println("subscribing B");
dataStream.subscribe(v -> System.out.println("B got " + v));
TimeUnit.SECONDS.sleep(1);

Answers to original questions

1) Cold observable always deals with single subscriber. So your time diagrams shall look like this:

subscribed first subscriber
[SUBSCRIBER: 1][ITEMS TO EMIT: 1,2,3]
subscribed second subscriber
[SUBSCRIBER: 1][ITEMS TO EMIT: 1,2,3]
[SUBSCRIBER: 2][ITEMS TO EMIT: 1,2,3]
emit "1" to subscriber 1
[SUBSCRIBER: 1][ITEMS TO EMIT: 2,3]
[SUBSCRIBER: 2][ITEMS TO EMIT: 1,2,3]
emit "1" to subscriber 2
[SUBSCRIBER: 1][ITEMS TO EMIT: 2,3]
[SUBSCRIBER: 2][ITEMS TO EMIT: 2,3]
...

Although the order is not guaranteed due to multi-threading races.

2) publish and autoConnect do not cancel each other. They only add.

dataSource = ...;
dataSourceShared = dataSource.publish().autoConnect();

Now when you subscribe multiple subscribers to dataSourceShared this results in only one subscription to original dataSource. I.e. you do not have to emit new series of messages for every new subscriber.

like image 36
Yaroslav Stavnichiy Avatar answered Oct 07 '22 04:10

Yaroslav Stavnichiy