Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava - Opposite of switchMap() Operator?

I am wondering if there is a way to compose existing operators to perform the opposite of a switchMap().

The switchMap() will chase after the latest emission it receives and cancel any Observable it previously was executing. Let's say I flipped it, and I want to ignore all emissions coming to a xxxMap() operator while it is busy with the first emission it received. It will keep ignoring emissions until it finishes emitting the current Observable inside of it. Then it will process the next emission it receives.

Observable.interval(1, TimeUnit.SECONDS)
        .doOnNext(i -> System.out.println("Source Emitted Value: " + i))
        .ignoreWhileBusyMap(i -> doIntensiveProcess(i).subcribeOn(Schedulers.computation())) 
        .subscribe(i -> System.out.println("Subscriber received Value: " + i));

Is there a way to accomplish this? In the above example, if intensiveProcess() were to last three seconds, the ignoreWhileBusyMap() would process 0 but likely ignore emissions 1 and 2 coming from interval().It would then process 3 but likely ignore 4 and 5, and so on...

like image 267
tmn Avatar asked May 11 '16 19:05

tmn


People also ask

What's the difference between MAP () FlatMap () concatMap () and SwitchMap () in Rx Java?

Map modifies each item emitted by a source Observable and emits the modified item. FlatMap, SwitchMap and ConcatMap also applies a function on each emitted item but instead of returning the modified item, it returns the Observable itself which can emit data again. FlatMap and ConcatMap work is pretty much same.

What are operators in RxJava?

Code your next android app using RxJava Use of Rx operators. With RxOperator are basically a function that defines the observable, how and when it should emit the data stream. There are hundreds of operators available in RxJava. You can read an alphabetical list of all the operators available from here.

How does FlatMap work in RxJava?

The flatMap operator converts each item returned from a source observable into an independent observable using the supplied function and then merges all the observables into a single observable. The order in which observables are merged isn't guaranteed to be the same as in the source Observable.

What is concatMap RxJava?

FlatMap takes emissions from source observable, then create new observable and merge it to original chain, while concatMap concat it to original chain. Main difference is that concatMap() will merge each mapped Observable sequentially and fire it one at a time.


1 Answers

Sure, gate the processing of a value by a boolean that is set after the processing finished:

AtomicBoolean gate = new AtomicBoolean(true);

Observable.interval(200, TimeUnit.MILLISECONDS)
.flatMap(v -> {
    if (gate.get()) {
        gate.set(false);

        return Observable.just(v).delay(500, TimeUnit.MILLISECONDS)
                .doAfterTerminate(() -> gate.set(true));
    } else {
        return Observable.empty();
    }
})
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);

Edit

Alternative:

Observable.interval(200, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.flatMap(v -> {
    return Observable.just(v).delay(500, TimeUnit.MILLISECONDS);
}, 1)
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);

You can change onBackpressureDrop to onBackpressureLatest to continue immediately with the latest value.

like image 65
akarnokd Avatar answered Oct 13 '22 20:10

akarnokd