Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why doesn't doOnNext() get called?

Neither onNext() nor onCompleted() get called for my subscriber below. I've tried implementing the subscriber via doOnNext()/doOnTerminate(). I also tried doAfterTerminate(). I've also tried explicitly defining a subscriber and neither onNext() nor onCompleted() got called for it.

According to RxJS reduce doesn't continue, it's the reduce() that's not terminating so I tried adding the take(1) but that didn't work. In the same stackoverflow question someone said the problem might be my stream never closes. Aside from take(1), maybe there is some other way I should close the stream, but I don't understand ReactiveX well enough yet.

According to Why is OnComplete not called in this code? (RxAndroid), it could be that the original stream in the series doesn't terminate. But I don't see why that would matter if I'm calling take(1) which I think is supposed to emit a termination signal.

Basically, why doesn't the following line get executed?

System.out.println("doOnNext map.size()=" + map.size());

Even though the following line of code gets executed 98 times:

map.put(e.getKey(), e.getValue());

JsonObjectObservableRequest.java

import com.android.volley.toolbox.JsonObjectRequest;
...
public class JsonObjectObservableRequest {

    public JsonObjectObservableRequest(int method, String url, JSONObject request) {
    jsonObjectRequest = new JsonObjectRequest(method, url, request, getResponseListener(),     getResponseErrorListener());
    }

    private Response.Listener<JSONObject> getResponseListener() {
        return new Response.Listener<JSONObject>() {
            @Override
            public void onResponse(JSONObject response) {
                publishSubject.onNext(Observable.just(response));
            }
        };
    }

    private Response.ErrorListener getResponseErrorListener() {
        return new Response.ErrorListener() {
            @Override
            public void onErrorResponse(VolleyError error) {
                Observable<JSONObject> myError = Observable.error(error);
                publishSubject.onNext(myError);
            }
        };
    }

    public JsonObjectRequest getJsonObjectRequest() {
        return jsonObjectRequest;
    }

    public Observable<JSONObject> getObservable() {
    return publishSubject.flatMap(new Func1<Observable<JSONObject>, Observable<    JSONObject>>() {
            @Override
            public Observable<JSONObject> call(Observable<JSONObject> jsonObjectObservable) {
                return jsonObjectObservable;
            }
        });
    }
}

JsonObjectObservableRequest calling code

import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
    JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
    Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();

    jsonObjectObservable
            .map(json -> {
                try {
                    return NetworkAccountIdDatasource.parseIdJSON(json);
                } catch (JSONException e) {
                    e.printStackTrace();
                    return null;
                }
            })
            .flatMapIterable(x -> x)
            .map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
            .reduce(new HashMap<String, String>(), (map, e) -> {
                map.put(e.getKey(), e.getValue());
                return map;
            })
            .take(1)
            .doOnNext(map -> {
                System.out.println("doOnNext map.size()=" + map.size());
            })
            .doOnTerminate(() -> {
                System.out.println("doOnTerminate");
            })
            .subscribe();

    final RequestQueue queue = Volley.newRequestQueue(context);
    queue.add(jsonObjectObservableRequest.getJsonObjectRequest());
like image 759
Michael Osofsky Avatar asked Dec 04 '25 04:12

Michael Osofsky


1 Answers

You identified correctly the problem, you're not calling onCompleted() on the source network Observable (at the JsonObjectObservableRequest class), while the take(1) is not helpful either because you put it before the reduce.
As you were probably understood, reduce must operate on Observable with finite number of emissions, as it's emit the accumulated item when all items have been emitted and thus it's relies on the onCompleted() event to know when the observed stream has end.

  • I think that in your case, the best thing is to change the way JsonObjectObservableRequest operates, you don't need a Subject for this, you can use creation methods to wrap a callback (you can see my answer here, and read more about bridging between callback world to RxJava here.
  • Moreover, you don't need the emit an Observable of something and then flatmap it to items, you can simply emit the item with onNext() and emit an error with onError(), actually you're hiding the errors and converting them to an emission, this might make it harder to handle errors down the stream.
  • You should callonCompleted() at onResponse() after you call onNext() to signal the completion. and in the case of error, signaling onError will notify termination of the stream.
  • Another concern is canceling the request which seems not handled in this way, you can read the sources up to see how it can be done when wrapping callbacl call.
like image 162
yosriz Avatar answered Dec 05 '25 19:12

yosriz