What is the difference between
ObservableTransformer {
Observable.merge(
it.ofType(x).compose(transformerherex),
it.ofType(y).compose(transformerherey)
)
}
and
ObservableTransformer {
it.publish{ shared ->
Observable.merge(
shared.ofType(x).compose(transformerherex),
shared.ofType(y).compose(transformerherey)
)
}
}
when I run my code using this two, I got the same results. What does publish do here.
The difference is that the top transformer will subscribe to the upstream twice for a single subscription from the downstream, duplicating any side effects of the upstream which is usually not wanted:
Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3)
.doOnSubscribe(s -> System.out.println("Subscribed!"));
mixedSource.compose(f ->
Observable.merge(
f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
)
)
.subscribe(System.out::println);
will print
Subscribed!
2
3
4
Subscribed!
A
B
C
The side-effect represented here is the printout Subscribed!
Depending on the actual work in a real source, that could mean sending an email twice, retrieving the rows of a table twice. With this particular example, you can see that even if the source values are interleaved in their type, the output contains them separately.
In contrast, publish(Function)
will establish one subscription to the source per one end subscriber, thus any side-effects at the source only happen once.
mixedSource.publish(f ->
Observable.merge(
f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
)
)
.subscribe(System.out::println);
which prints
Subscribed!
A
2
B
3
C
4
because the source is subscribed once and each item is multicast to the two "arms" of the .ofType().compose()
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With