Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava2 Publish

What is the difference between

ObservableTransformer {
    Observable.merge(
        it.ofType(x).compose(transformerherex),
        it.ofType(y).compose(transformerherey)
    )
}

and

ObservableTransformer {
    it.publish{ shared ->
        Observable.merge(
            shared.ofType(x).compose(transformerherex),
            shared.ofType(y).compose(transformerherey)
        )
    }
}

when I run my code using this two, I got the same results. What does publish do here.

like image 408
Hohenheim Avatar asked Aug 29 '17 04:08

Hohenheim


1 Answers

The difference is that the top transformer will subscribe to the upstream twice for a single subscription from the downstream, duplicating any side effects of the upstream which is usually not wanted:

Observable<Object> mixedSource = Observable.<Object>just("a", 1, "b", 2, "c", 3)
      .doOnSubscribe(s -> System.out.println("Subscribed!"));


mixedSource.compose(f ->
   Observable.merge(
      f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
      f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
   )
)
.subscribe(System.out::println);

will print

Subscribed!
2
3
4
Subscribed!
A
B
C

The side-effect represented here is the printout Subscribed! Depending on the actual work in a real source, that could mean sending an email twice, retrieving the rows of a table twice. With this particular example, you can see that even if the source values are interleaved in their type, the output contains them separately.

In contrast, publish(Function) will establish one subscription to the source per one end subscriber, thus any side-effects at the source only happen once.

mixedSource.publish(f ->
   Observable.merge(
      f.ofType(Integer.class).compose(g -> g.map(v -> v + 1)),
      f.ofType(String.class).compose(g -> g.map(v -> v.toUpperCase()))
   )
)
.subscribe(System.out::println);

which prints

Subscribed!
A
2
B
3
C
4

because the source is subscribed once and each item is multicast to the two "arms" of the .ofType().compose().

like image 68
akarnokd Avatar answered Oct 13 '22 19:10

akarnokd