Neither onNext() nor onCompleted() get called for my subscriber below. I've tried implementing the subscriber via doOnNext()/doOnTerminate(). I also tried doAfterTerminate(). I've also tried explicitly defining a subscriber and neither onNext() nor onCompleted() got called for it.
According to RxJS reduce doesn't continue, it's the reduce() that's not terminating so I tried adding the take(1) but that didn't work. In the same stackoverflow question someone said the problem might be my stream never closes. Aside from take(1), maybe there is some other way I should close the stream, but I don't understand ReactiveX well enough yet.
According to Why is OnComplete not called in this code? (RxAndroid), it could be that the original stream in the series doesn't terminate. But I don't see why that would matter if I'm calling take(1) which I think is supposed to emit a termination signal.
Basically, why doesn't the following line get executed?
System.out.println("doOnNext map.size()=" + map.size());
Even though the following line of code gets executed 98 times:
map.put(e.getKey(), e.getValue());
JsonObjectObservableRequest.java
import com.android.volley.toolbox.JsonObjectRequest;
...
public class JsonObjectObservableRequest {
public JsonObjectObservableRequest(int method, String url, JSONObject request) {
jsonObjectRequest = new JsonObjectRequest(method, url, request, getResponseListener(), getResponseErrorListener());
}
private Response.Listener<JSONObject> getResponseListener() {
return new Response.Listener<JSONObject>() {
@Override
public void onResponse(JSONObject response) {
publishSubject.onNext(Observable.just(response));
}
};
}
private Response.ErrorListener getResponseErrorListener() {
return new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
Observable<JSONObject> myError = Observable.error(error);
publishSubject.onNext(myError);
}
};
}
public JsonObjectRequest getJsonObjectRequest() {
return jsonObjectRequest;
}
public Observable<JSONObject> getObservable() {
return publishSubject.flatMap(new Func1<Observable<JSONObject>, Observable< JSONObject>>() {
@Override
public Observable<JSONObject> call(Observable<JSONObject> jsonObjectObservable) {
return jsonObjectObservable;
}
});
}
}
JsonObjectObservableRequest calling code
import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();
jsonObjectObservable
.map(json -> {
try {
return NetworkAccountIdDatasource.parseIdJSON(json);
} catch (JSONException e) {
e.printStackTrace();
return null;
}
})
.flatMapIterable(x -> x)
.map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
.reduce(new HashMap<String, String>(), (map, e) -> {
map.put(e.getKey(), e.getValue());
return map;
})
.take(1)
.doOnNext(map -> {
System.out.println("doOnNext map.size()=" + map.size());
})
.doOnTerminate(() -> {
System.out.println("doOnTerminate");
})
.subscribe();
final RequestQueue queue = Volley.newRequestQueue(context);
queue.add(jsonObjectObservableRequest.getJsonObjectRequest());
You identified correctly the problem, you're not calling onCompleted() on the source network Observable (at the JsonObjectObservableRequest class), while the take(1) is not helpful either because you put it before the reduce.
As you were probably understood, reduce must operate on Observable with finite number of emissions, as it's emit the accumulated item when all items have been emitted and thus it's relies on the onCompleted() event to know when the observed stream has end.
JsonObjectObservableRequest operates, you don't need a Subject for this, you can use creation methods to wrap a callback (you can see my answer here, and read more about bridging between callback world to RxJava here.Observable of something and then flatmap it to items, you can simply emit the item with onNext() and emit an error with onError(), actually you're hiding the errors and converting them to an emission, this might make it harder to handle errors down the stream.onCompleted() at onResponse() after you call onNext() to signal the completion. and in the case of error, signaling onError will notify termination of the stream.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With