Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why do we need Publish and RefCount Rx operators in this case?

I'm trying to familiarise myself with the problem of reactive backpressure handling, specifically by reading this wiki: https://github.com/ReactiveX/RxJava/wiki/Backpressure

In the buffer paragraph, we have this more involved example code:

// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);

If I understand correctly, we're effectively debouncing the bursty source stream by generating a debounced signal stream for the buffer operator.

But why do we need to use the publish and refcount operators here? What problem would it cause if we'd just drop them? The comments don't make it much clearer for me, aren't RxJava Observables up to multicasting by default?

like image 413
Zsombor Erdődy-Nagy Avatar asked Oct 19 '22 01:10

Zsombor Erdődy-Nagy


1 Answers

The answer lies in the difference between hot and cold observables.

Buffer operator combines the 2 streams and has no way to know they have a common source (in your case). When activated (subscribed to), it'll subscribe to them both, which in return will trigger 2 distinct subscriptions to your original input.

Now 2 things can happen, either the input is a hot observable, and the subscription has no effect but to register the listener, and everything will work as expected, or it's a cold observable, and each subscription will result in potentially distinct and desynchronized streams.

For instance a cold observable can be one which performs a network request when subscribed, and notified the result. Not calling publish on it means 2 requests will be done.

Publish+refcount/connect is the usual way to transform a cold observable into a hot one, making sure a single subscribe will happen, and all streams will behave identically.

like image 119
Gluck Avatar answered Oct 21 '22 22:10

Gluck