Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: How to conditionally apply Operators to an Observable without breaking the chain

I have a chain of operators on an RxJava observable. I'd like to be able to apply one of two operators depending on a boolean value without "breaking the chain".

I'm relatively new to Rx(Java) and I feel like there's probably a more idiomatic and readable way of doing this than my current approach of introducing a temporary variable.

Here's a concrete example, buffering items from an observable if a batch size field is non-null, otherwise emitting a single batch of unbounded size with toList():

Observable<Item> source = Observable.from(newItems);
Observable<List<Item>> batchedSource = batchSize == null ?
                source.toList() :
                source.buffer(batchSize);
return batchedSource.flatMap(...).map(...)

Is something like this possible? (pseudo-lambdas because Java):

Observable.from(newItems)
    .applyIf(batchSize == null,
             { o.toList() },
             { o.buffer(batchSize) })
    .flatMap(...).map(...)
like image 424
user1405990 Avatar asked Mar 17 '16 10:03

user1405990


2 Answers

You can use compose(Func1) to stay in-sequence but do custom behavior

source
.compose(o -> condition ? o.map(v -> v + 1) : o.map(v -> v * v))
.filter(...)
.subscribe(...)
like image 130
akarnokd Avatar answered Nov 17 '22 23:11

akarnokd


You can also use filter operator with defaultIfEmpty if you wish to emit single value or switchIfEmpty if you wish to emit multiple values using another Observable.

val item = Observable.just("ABC")

item.filter { s -> s.startsWith("Z") }
    .defaultIfEmpty("None")
    .subscribe { println(it) }
like image 32
Thracian Avatar answered Nov 17 '22 21:11

Thracian