I've an Android app with multiple observers of type A that subscribe with several Observables of type B. The subscription is done in IO Scheduler and the observation on the Android main thread.
The problem that I've is that randomlly after some work one message emitted by B is never received in A and after some hours of debbuging I can't find the cause.
Relevant code when the problem happens:
"NEXT1" and "NEXT2" are printed but "RECEIVED","ERROR", COMPLETED aren't.
//The subscription
B.getMessate()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(A);
//B
Observable<msg> getMessage() {
return Observable.create(new Observable.OnSubscribe<msg>() {
public void call(Subscriber<? super msg> subscriber) {
...
subscriber.onNext(msg)
println("NEXT1")
}
}).doOnNext({ (o) -> println("NEXT2")});
}
//A
onNext(msg) {
//Never called when problem happens
println("RECEIVED")
}
onError(msg) {
//Never called when problem happens
println("ERROR")
}
onError(msg) {
//Never called when problem happens
println("COMPLETED")
}
Anyone has some clue? or any recommendation for debugging that?
What I've checked:
As a brief note to self, when you need to debug a chain of RxJava Observable method calls, you can use the doOnNext method to log the current values or print them to STDOUT or STDERR with println . Here's an example from RxJava For Android Developers, where the debug output is logged with the Android Log.
RxJava is a Java VM implementation of ReactiveX a library for composing asynchronous and event-based programs by using observable sequences. The building blocks of RxJava are Observables and Subscribers. Observable is used for emitting items and Subscriber is used for consuming those items.
RxJava is a Java library that enables Functional Reactive Programming in Android development. It raises the level of abstraction around threading in order to simplify the implementation of complex concurrent behavior.
RxAndroid is a RxJava for Android extension that is only used in Android applications. RxAndroid added the Android-required Main Thread. We will need the Looper and Handler for Main Thread execution in order to work with multithreading in Android. Note: AndroidSchedulers are provided by RxAndroid.
By now I can't reproduce the problem but I've found RxJavaDebug a very good tool for debugging.
It's use is simple: add the library as a dependency and at application start register a listener:
RxJavaPlugins.getInstance().registerObservableExecutionHook(new DebugHook(new DebugNotificationListener() {
public Object onNext(DebugNotification n) {
Log.v(TAG, "onNext on " + n);
return super.onNext(n);
}
public Object start(DebugNotification n) {
Log.v(TAG, "start on " + n);
return super.start(n);
}
public void complete(Object context) {
Log.v(TAG, "complete on " + context);
}
public void error(Object context, Throwable e) {
Log.e(TAG, "error on " + context);
}
}));
That will log messages while they go between observables and operators.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With