Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fatal Exception: java.io.InterruptedIOException: thread interrupted while using retrofit with rxjava

I am getting this random exception that crashes my app while making network calls with retrofit and am looking for some guidance for resolution

Fatal Exception: java.io.InterruptedIOException: thread interrupted
       at okio.Timeout.throwIfReached(Timeout.java:145)
       at okio.Okio$1.write(Okio.java:76)
       at okio.AsyncTimeout$1.write(AsyncTimeout.java:180)
       at okio.RealBufferedSink.flush(RealBufferedSink.java:216)
       at okhttp3.internal.http2.Http2Writer.flush(Http2Writer.java:121)
       at okhttp3.internal.http2.Http2Connection.newStream(Http2Connection.java:239)
       at okhttp3.internal.http2.Http2Connection.newStream(Http2Connection.java:205)
       at okhttp3.internal.http2.Http2Codec.writeRequestHeaders(Http2Codec.java:111)
       at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:50)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at com.happycorp.happy.happyapp.util.Network$1.intercept(Network.java:80)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at com.happycorp.android.commondata.net.RetrofitFactory$CustomHttpMetricsLogger.intercept(RetrofitFactory.java:139)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:125)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at com.happycorp.android.commondata.net.RetrofitFactory$1.intercept(RetrofitFactory.java:83)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:212)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
       at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
       at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
       at okhttp3.RealCall.execute(RealCall.java:77)
       at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
       at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41)
       at io.reactivex.Observable.subscribe(Observable.java:10179)
       at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
       at io.reactivex.Observable.subscribe(Observable.java:10179)
       at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
       at io.reactivex.Single.subscribe(Single.java:2558)
       at io.reactivex.internal.operators.single.SingleToFlowable.subscribeActual(SingleToFlowable.java:37)
       at io.reactivex.Flowable.subscribe(Flowable.java:12218)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:156)
       at io.reactivex.internal.operators.flowable.FlowableFromIterable$IteratorSubscription.slowPath(FlowableFromIterable.java:238)
       at io.reactivex.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:123)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:110)
       at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:68)
       at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:46)
       at io.reactivex.Flowable.subscribe(Flowable.java:12218)
       at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:52)
       at io.reactivex.Flowable.subscribe(Flowable.java:12218)
       at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
       at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
       at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
       at java.util.concurrent.FutureTask.run(FutureTask.java:237)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
       at java.lang.Thread.run(Thread.java:761)

This is what the service interface looks like:

@GET("categories")
Single<Category> getCategories();

@GET("categories/{categoryId}")
Single<Category> getCategory(@Path("categoryId") Integer id);

@GET("categories")
Single<Category> getCategoriesByRootCategory(@Query("rootCategoryId") Integer id);

@GET("products")
Single<ProductSearchResult> getProductsBySingleFilter(@Query(SINGLE_FILTER_BASE + "[field]")
        String searchCriteria,
        @Query(SINGLE_FILTER_BASE + "[conditionType]")
                String conditionType,
        @Query(SINGLE_FILTER_BASE + "[value]") String value);

Here is the code that creates the okhttp client and the retrofit interface:

public static OkHttpClient getOkHttpClient(@Nullable final File parentCacheDir,
        @NonNull final AuthProvider authProvider,
        @NonNull final ApiEnvironment apiEnvironment,
        @Nullable final Interceptor cacheInterceptor) {

    Cache cache = null;
    if (parentCacheDir != null) {
        File responseCacheDirectory = new File(parentCacheDir, RESPONSE_CACHE_DIRECTORY);
        cache = new Cache(responseCacheDirectory, CACHE_SIZE);
    }

    OkHttpClient.Builder builder = new OkHttpClient.Builder()
            .connectTimeout(NETWORK_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)
            .readTimeout(NETWORK_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)
            .writeTimeout(NETWORK_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)
            .cache(cache);

    HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
    loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.HEADERS);

    builder.addInterceptor(loggingInterceptor);

    builder.addInterceptor(new Interceptor() {
        @Override
        public Response intercept(Chain chain) throws IOException {
            Request.Builder requestBuilder = chain.request().newBuilder();
            requestBuilder.header(KEY_CONTENT_TYPE, APPLICATION_JSON);
            requestBuilder = addAuthHeaders(requestBuilder, apiEnvironment);
            if (apiEnvironment.getDefaultRequestParams() != null) {
                requestBuilder = addDefaultParams(requestBuilder, apiEnvironment);
            }
            return chain.proceed(requestBuilder.build());
        }
    });

    builder.addNetworkInterceptor(new CustomHttpMetricsLogger());

    if (cacheInterceptor != null) {
        builder.addNetworkInterceptor(cacheInterceptor);
    }

    return builder.build();
}

public static Retrofit newInstance(@NonNull final AuthProvider authProvider,
        @NonNull final ApiEnvironment apiEnvironment,
        @Nullable final File parentCacheDir,
        @Nullable final Interceptor debugInterceptor) {
    return new Retrofit.Builder()
            .addConverterFactory(GsonConverterFactory.create(getGson()))
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .client(getOkHttpClient(parentCacheDir, authProvider, apiEnvironment, debugInterceptor))
            .baseUrl(apiEnvironment.getBaseUrl())
            .build();
}

And here is the code that is generating the exception:

public static Flowable<ProductSearchResult> getAllItemsForCategories(final HebMagentoApi magentoService,
                                                                     List<Category> categories) {
    return Flowable.fromIterable(categories).flatMap(category ->
            magentoService.getProductsBySingleFilter(HebMagentoApi.MAGENTO_CATEGORY_ID,
                    HebMagentoApi.MAGENTO_FILTER_EQUAL,
                    String.valueOf(category.getId())).toFlowable());
}

I think this issue is related to this line in the adapter code:

https://github.com/square/retrofit/blob/master/retrofit-adapters/rxjava2/src/main/java/retrofit2/adapter/rxjava2/BodyObservable.java#L59

like image 360
dazza5000 Avatar asked Feb 14 '18 05:02

dazza5000


1 Answers

The issue was with how we are creating the RxJava2CallAdapterFactory. We were using Dagger 2 to create the okhttp client and as far as I know that graph was constructed on the main thread and so the rxjava2calladapterfactory would use the main thread by default if we were creating flowables / observables by hand using retrofit.

To fix this so that all of the calls to Retrofit using RxJava2 happen on a background thread we create the call adapter in the the following way:

RxJavaCallAdapterFactory.createWithScheduler(Schedulers.io())

IMPORTANT: The other thing to check to keep your app from crashing is to make sure that you are defining onError functions for each of your RxJava operators: flatMap, map, etc

like image 170
dazza5000 Avatar answered Nov 07 '22 03:11

dazza5000