Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can RxJava reduce() be unsafe when parallelized?

I want to use the reduce() operation on observable to map it to a Guava ImmutableList, since I prefer it so much more to the standard ArrayList.

Observable<String> strings = ...

Observable<ImmutableList<String>> captured = strings.reduce(ImmutableList.<String>builder(), (b,s) -> b.add(s))
                .map(ImmutableList.Builder::build);

captured.forEach(i -> System.out.println(i));

Simple enough. But suppose I somewhere scheduled the observable strings in parallel with multiple threads or something. Would this not derail the reduce() operation and possibly cause a race condition? Especially since the ImmutableList.Builder would be vulnerable to that?

like image 756
tmn Avatar asked Jun 04 '15 02:06

tmn


2 Answers

The problem lies in the shared state between realizations of the chain. This is pitfall # 8 in my blog:

Shared state in an Observable chain

Let's assume you are dissatisfied with the performance or the type of the List the toList() operator returns and you want to roll your own aggregator instead of it. For a change, you want to do this by using existing operators and you find the operator reduce():

Observable<Vector<Integer>> list = Observable
    .range(1, 3)
    .reduce(new Vector<Integer>(), (vector, value) -> {
        vector.add(value);
        return vector;
    });

list.subscribe(System.out::println);
list.subscribe(System.out::println);
list.subscribe(System.out::println);

When you run the 'test' calls, the first prints what you'd expect, but the second prints a vector where the range 1-3 appears twice and the third subscribe prints 9 elements!

The problem is not with the reduce() operator itself but with the expectation surrounding it. When the chain is established, the new Vector passed in is a 'global' instance and will be shared between all evaluation of the chain.

Naturally, there is a way of fixing this without implementing an operator for the whole purpose (which should be quite simple if you see the potential in the previous CounterOp):

Observable<Vector<Integer>> list2 = Observable
    .range(1, 3)
    .reduce((Vector<Integer>)null, (vector, value) -> {
        if (vector == null) {
            vector = new Vector<>();
        }
        vector.add(value);
        return vector;
    });

list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);

You need to start with null and create a vector inside the accumulator function, which now isn't shared between subscribers.

Alternatively, you can look into the collect() operator which has a factory callback for the initial value.

The rule of thumb here is that whenever you see an aggregator-like operator taking some plain value, be cautious as this 'initial value' will most likely be shared across all subscribers and if you plan to consume the resulting stream with multiple subscribers, they will clash and may give you unexpected results or even crash.

like image 134
akarnokd Avatar answered Oct 08 '22 18:10

akarnokd


According to the Observable contract, an observable must not make onNext calls in parallel, so you have to modify your strings Observable to respect this. You can use the serialize operator to achieve this.

like image 1
Samuel Gruetter Avatar answered Oct 08 '22 16:10

Samuel Gruetter