So I am trying to implement instant search using rxjava2 and retrofit,
the process is simple, as soon as the user changes the text publish.onNext()
is called (publish is a PublishSubject
object).
I have added filter and debounce and switch map operators to facilitate search from the server when text's length is greater than a threshold and call is not made with successive input simultaneously.
This is the code :
subject = PublishSubject.create();
getCompositeDisposable().add(subject
.filter(s -> s.length() >= 3)
.debounce(300,
TimeUnit.MILLISECONDS)
.switchMap(s -> getDataManager().getHosts(
getDataManager().getDeviceToken(),
s).observeOn(Schedulers.io()))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(hostResponses -> {
getMvpView().hideEditLoading();
if (hostResponses.size() != 0) {
if (this.hostResponses != null)
this.hostResponses.clear();
this.hostResponses = hostResponses;
getMvpView().setHostView(getHosts(hostResponses));
} else {
getMvpView().onFieldError("No host found");
}
}, throwable -> {
getMvpView().hideEditLoading();
if (throwable instanceof HttpException) {
HttpException exception = (HttpException)throwable;
if (exception.code() == 401) {
getMvpView().onError(R.string.code_expired,
BaseUtils.TOKEN_EXPIRY_TAG);
}
}
})
);
Now my code is working fine, I am achieving what I need but I am getting a bug when I enter a long string and press backspace button, what happens is that when my AutoCompleteTextView's text is cleared, an exception is thrown
Here is the stacktrace of the exception :
java.io.InterruptedIOException: thread interrupted
at okio.Timeout.throwIfReached(Timeout.java:145)
at okio.Okio$1.write(Okio.java:76)
at okio.AsyncTimeout$1.write(AsyncTimeout.java:180)
at okio.RealBufferedSink.flush(RealBufferedSink.java:216)
at okhttp3.internal.http1.Http1Codec.finishRequest(Http1Codec.java:166)
at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:84)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at com.facebook.stetho.okhttp3.StethoInterceptor.intercept(StethoInterceptor.java:56)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:125)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
at okhttp3.RealCall.execute(RealCall.java:77)
at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41)
at io.reactivex.Observable.subscribe(Observable.java:10700)
at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
at io.reactivex.Observable.subscribe(Observable.java:10700)
at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:45)
at io.reactivex.Observable.subscribe(Observable.java:10700)
at io.reactivex.internal.operators.observable.ObservableSwitchMap$SwitchMapObserver.onNext(ObservableSwitchMap.java:126)
at io.reactivex.observers.SerializedObserver.onNext(SerializedObserver.java:111)
at io.reactivex.internal.operators.observable.ObservableDebounceTimed$DebounceTimedObserver.emit(ObservableDebounceTimed.java:140)
at io.reactivex.internal.operators.observable.ObservableDebounceTimed$DebounceEmitter.run(ObservableDebounceTimed.java:165)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:762
That inner observeOn(Schedulers.io())
doesn't look right, given that you immediately move the element back to the main thread after that. It should be subscribeOn(Schedulers.io())
there instead.
Also remove the subscribeOn()
call just before your subscribe
call as it should have no practical effect given that the chain is subscribed to a PublishSubject
at the top.
.switchMap(s -> getDataManager()
.getHosts(getDataManager().getDeviceToken(), s)
// .observeOn(Schedulers.io())
.subscribeOn(Schedulers.io()) // <-------------------------
)
.observeOn(AndroidSchedulers.mainThread())
//.subscribeOn(Schedulers.io()) // <--------------------------------------
.subscribe(hostResponses -> {
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With