Let's say I have a event-emitting data source that I want to transform into reactive stream. Data source is bound by a resource (for example a socket that periodically sends updated state) so I would want to share single Subscription to that resource. Using single observable with replay
(for new subscribers to immediately get current value value) and refCount
operators seems to be well suited for that. For example this his how MyDataProvider
singleton would look like:
private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
// Open my resource here and emit data into observable
})
.doOnDispose(() -> {
// Close my resource here
})
.replay(1)
.refCount();
public Observable<MyData> getMyDataObservable() {
return myDataObservable;
}
However, now let's say I have another data source that needs result of the first data source to compute its own value:
private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
.flatMap(myData -> {
// Call another data source and return the result here
})
public Observable<AnotherData> getAnotherDataObservable() {
return anotherDataObservable;
}
Here my setup starts to fall apart. Multicasting of the first observable only works until refCount
operator. After that, everything is unicast again. That would mean that if two separate subscriptions to anotherDataProvider
are made, flatMap
operator would be called twice. I see two workarounds for this, but I dislike both:
Simplest workaround seems to be for me to save unicast variant of myDataObservable
somewhere, before multicast operation is made and then perform that multicast operation in anotherDataObservable
However if those two observables are located in diferent modules, this workaround would make the code very inelegant, requiring MyDataProvider
to expose two different observables that seemingly return same data.
Second workaround seems to be to just apply those replay
and refCount
operators again in anotherDataObservable
. But this creates inefficiency since first multicast operator in myDataObservable
is already applied, but now does nothing, except waste memory and CPU cycles.
Both workarounds also involve coupling of the AnotherDataProvider
to the MyDataProvider
. If in the future MyDataProvider
changes and multicasting is no longer desired, I would also have to update AnotherDataProvider
to remove multicasting operators from there.
What would be the more elegant way to resolve this problem? Could I have architectured that any better to avoid the issue altogether?
About your first approach, in the current setup, your anotherDataObservable
uses myDataObservable
and as I understand they are logically coupled because they use the same source. So you would need to have some base shared logic for them. I would extract it to a common module, that will expose unicast version of the observable and then make myDataObservable
and anotherDataObservable
use it in different modules each adding multicast logic.
Another option would be to have a class that will monitor your resource by subscribing to it like in myDataObservable
, doing the processing in onNext
and publishing the mapped result with a Subject, i.e. BehavioralSubject if you want to always have access to last published value, and the raw result with another subject. The clients will subscribe to that subjects and will get the mapped or raw values that were calculated only once in the monitoring class.
P.S. remember to add backpressure strategy to your Subject before you subscribe to it.
If those options do not suit you, think about if it is really important to avoid calling flatMap
multiple times? Your code is quite straightforward and it is an important metric. If flatMap is not heavy you can just have it run multiple times.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With