In an Android project that uses RxJava 2, I create a Flowable
like this in the onCreate
of my initial activity:
Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER)
.doOnComplete(new MyAction())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MySubscriber());
The implementation of the FlowableOnSubscribe is:
public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe<String> {
public static final String TAG = "XX MyFlOnSub1";
@Override
public void subscribe(FlowableEmitter<String> emitter) {
Log.i(TAG, "subscribe");
emitter.onNext("hello");
emitter.onComplete();
}
}
This is the subscriber implementation:
public class MySubscriber implements Subscriber<String> {
public static final String TAG = "XX MySubscriber";
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
}
}
And the action implementation is:
public class MyAction implements Action {
public static final String TAG = "XX MyAction";
@Override
public void run() {
Log.i(TAG, "run");
}
}
In my output, I'm expecting to a log statement from onNext
, but I don't see one. Instead, this is my entire output:
02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
This indicates that onNext
never runs, and onComplete
doesn't even run either. But MyAction
runs successfully.
Here's what happens when I comment out the call to onNext
:
02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onComplete
In this case onNext
of course doesn't run, but at least onComplete
runs.
I expected that I would see onComplete
run in both cases, and onNext
run when I call emitter.onNext
. What am I doing wrong here?
You need to manually issue a request otherwise no data will be emitted when extending Subscriber
directly:
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);
}
Alternatively, you could extend DisposableSubscriber
or ResourceSubscriber
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With