Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Behaviour of onNext and onComplete

I have an Observable that does something without the need to emit a value. Also I have a list of objects I want the Observable to work with. So for all elements in this list: doSomething()

Observable.from(uris)
        .flatMap(new Func1<Uri, Observable<Void>>() {
            @Override
            public Observable<Void> call(Uri uri) {
                return createDoSomethingObservable(uri);
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(new Observer<Void>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "completed");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Void aVoid) {
                Log.d(TAG, "next");
            }
        });

And the method that creates the Observable:

Observable<Void> createDoSomethingObservable(final Uri uri) {
    return Observable.create(new Observable.OnSubscribe<Void>() {
        @Override
        public void call(Subscriber<? super Void> subscriber) {
            //doSomething
            subscriber.onNext(null);
            subscriber.onCompleted();
        }
    });
}

Now when I run this with a List with 3 elements I get:

next
next
next
completed

which is good, because that is what I wanted, but I don't know why it's working. First I started to just call onComplete, because in the end the observable does its job and completes. But then of course onNext is never called on the subscriber. The same goes for the other way round.

So my questions are:

  1. Why is onComplete only called for the last list element?
  2. Is there a better way to solve this?
like image 852
Kuno Avatar asked Jul 23 '15 10:07

Kuno


3 Answers

onComplete (same as onError) is called only once during observable chain, it is the way that rxjava is implemented

I think that your approach is correct, so better way is not needed.

like image 107
pjanecze Avatar answered Nov 09 '22 08:11

pjanecze


onComplete is called for the last element because that's when the earliest observable in the chain (from(uris)) has finished.

It's expected that your observables emitted from flatMap will call onComplete. Once that's done (and call has returned), then the next emission from from can be worked on. Once from has finished emitting observables, it calls onComplete and the chain is finished, effectively.

like image 36
Adam S Avatar answered Nov 09 '22 07:11

Adam S


I think, that small code helps you to understand behavior of onNext( ) and onComplete().
Suppose, you have an List<Uri>. Let's transform it to Observable<Uri> manually.

public static Observable<Uri> getUries(List<Uri> uriList){
    return Observable.create(new Observable.OnSubscribe<Uri>() {
        @Override
        public void call(Subscriber<? super Uri> subscriber) {
            for(Uri uri : uriList){
                subscriber.onNext(uri);
            }

            subscriber.onCompleted();
        }
    });
}

Or using Lambda expressions:

public static Observable<Uri> getUries(List<Uri> uriList){
    return Observable.create(subscriber -> {
        for(Uri uri : uriList){
            subscriber.onNext(uri);
        }

        subscriber.onCompleted();
    });
}

As you can see, we are iterating input list, and call onNext( ) for every element, and when we finished transforming our List to Observable, we called onComplete()

P.S. This code just a demonstration, please, never use it to transfor List to Observable. Use operator Observable.from() for it.

UPDATE:

Operator from( ) implementation:

...
while (true) {
    if (o.isUnsubscribed()) {
        return;
    } else if (it.hasNext()) {
        o.onNext(it.next());
    } else if (!o.isUnsubscribed()) {
        o.onCompleted();
        return;
    } else {
        // is unsubscribed
        return;
    }
}
...

link:https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java#L75-L87

like image 5
Aleksandr Avatar answered Nov 09 '22 08:11

Aleksandr