Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava Observable get notified on first emission

I have three Observables which I combine with combineLastest:

    Observable<String> o1 = Observable.just("1");
    Observable<String> o2 = Observable.just("2");
    Observable<String> o3 = Observable.just("3");

    Observable.combineLatest(o1, o2, o3, new Func3<String, String, String, Object>() {
        @Override
        public Object call(String s, String s2, String s3) {
            return null;
        }
    });

I want to be notified about the first emission of one of the Observables without ignoring the later emissions, which I guess first operator would do. Is there a convenient operator for that like (example):

    o1.doOnFirst(new Func1<String, Void>() {
        @Override
        public Void call(String s) {
            return null;
        }
    })
like image 862
fweigl Avatar asked Nov 22 '15 20:11

fweigl


1 Answers

For convenience, I created these extension functions for Flowable and Observable.
Note, that with doOnFirst() the action will be called before the first element emission, whilst doAfterFirst() will firstly emit the first item and then perform the action.

fun <T> Observable<T>.doOnFirst(onFirstAction: (T) -> Unit): Observable<T> =
    take(1)
        .doOnNext { onFirstAction.invoke(it) }
        .concatWith(skip(1))

fun <T> Flowable<T>.doOnFirst(onFirstAction: (T) -> Unit): Flowable<T> =
    take(1)
        .doOnNext { onFirstAction.invoke(it) }
        .concatWith(skip(1))

fun <T> Observable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Observable<T> =
    take(1)
        .doAfterNext { afterFirstAction.invoke(it) }
        .concatWith(skip(1))

fun <T> Flowable<T>.doAfterFirst(afterFirstAction: (T) -> Unit): Flowable<T> =
    take(1)
        .doAfterNext { afterFirstAction.invoke(it) }
        .concatWith(skip(1))

Usage is as simple as this:

Flowable.fromArray(1, 2, 3)
            .doOnFirst { System.err.println("First $it") }
            .subscribe { println(it) }

Output:

// First 1  
// 1  
// 2  
// 3

And:

Flowable.fromArray(1, 2, 3)
            .doAfterFirst { System.err.println("First $it") }
            .subscribe { println(it) }

Output:

// 1  
// First 1
// 2  
// 3
like image 75
Leonid Ustenko Avatar answered Nov 15 '22 18:11

Leonid Ustenko