Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I handle HTTP errors like 401, 403, 503,500 using RxJava Observer instead of Event Bus

I'm using Retrofit, OK-HTTP and RxJava2 to handle network calls, I created below interceptor to handle the Network error response for each network calls, Is there a better way to Handle this? Is this the case for EventBus?

I don't want to check this error exceptionin each method,

//HTTP Client

         OkHttpClient tempClient = new OkHttpClient.Builder()
                            .readTimeout(CONNECT_TIMEOUT_IN_SEC, TimeUnit.SECONDS)// connect timeout
                            .connectTimeout(CONNECT_TIMEOUT_IN_SEC, TimeUnit.SECONDS)// socket timeout
                            .followRedirects(false)
                            .cache(provideHttpCache())
                            .addNetworkInterceptor(new ResponseCodeCheckInterceptor())
                            .addNetworkInterceptor(new ResponseCacheInterceptor())
                            .addInterceptor(new AddHeaderAndCookieInterceptor())
                            .build();

HTTP Client Interceptor

      public class ResponseCodeCheckInterceptor implements Interceptor {
            private static final String TAG = "RespCacheInterceptor";

            @Override
            public Response intercept(Chain chain) throws IOException {
                Response response = chain.proceed(chain.request());
                Request originalRequest = chain.request();
                if (response.code() == HttpStatus.UNAUTHORIZED.value()) {
                      throw new UnAuthorizedException();
                }else if (response.code() == HttpStatus.INTERNAL_SERVER_ERROR.value()) {
                    throw new APIException(response.code(), "Server Internal Error");
                } else if (response.code() == HttpStatus.SERVICE_UNAVAILABLE.value()) {
                  throw new ServiceUnavailableException();
                } else {
                   throw new APIException(code,response.body().toString());
                }
                return response;
            }
        }

API Class

         @GET("customer/account/")
            Single<Customer> getCustomer();
                  ......

Repository Class

        @Override
        public Single<Customer> getCustomer() {
            return this.mCustomerRemoteDataStore.getCustomer()
                    .doOnSuccess(new Consumer<Customer>() {
                        @Override
                        public void accept(Customer customer) throws Exception {
                            if (customer != null) {
                                mCustomerLocalDataStore.saveCustomer(customer);
                            }
                        }
                    }).doOnError(new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {

                        }
                    });
        }

Presenter Class

    @Override
    public void getCustomerFullDetails() {
        checkViewAttached();
        getView().showLoading();
        addSubscription(customerRepository.getCustomerFullDetails(true)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableSingleObserver<CustomerDetails>() {
                    @Override
                    public void onSuccess(@io.reactivex.annotations.NonNull CustomerDetails customerDetails) {
                        getView().onCustomerDetailsSuccess();
                    }

                    @Override
                    public void onError(@io.reactivex.annotations.NonNull Throwable throwable) {
                        Log.d(TAG, "error: " + throwable.getLocalizedMessage());
                        if (throwable instanceof UnAuthorizedException) {
                            getView().showLoginPage();
                         else if (throwable instanceof ServiceUnavailableException) {
                            getView().showServiceUnAvaiableMsg();
                        }...
                    }
                })
        );
    }

UPDATED CODE ============

    public class CheckConnectivityInterceptor implements Interceptor {

        private static final String TAG = CheckConnectivityInterceptor.class.getSimpleName() ;
        private boolean isNetworkActive;

        private RxEventBus eventBus;

        private Context mContext;

        public CheckConnectivityInterceptor(RxEventBus eventBus, Context mContext) {
            this.mContext = mContext;
            this.eventBus = eventBus;
        }

        @Override
        public Response intercept(Interceptor.Chain chain) throws IOException {
            Request originalRequest = chain.request();
            String requestPath = originalRequest.url().url().getPath();
            if (!NetworkUtil.isConnected(this.mContext)) {
                eventBus.send(new ErrorState(ErrorType.NO_INTERNET_CONNECTION,
                        this.mContext.getString(R.string.no_network_connection), requestPath));
                        //Added this exception so it's not trying to execute the chain
                throw new NoConnectivityException();
            } else {
                Response originalResponse = null;
                try {
                    originalResponse = chain.proceed(chain.request());
                } catch (Exception ex) {
                    eventBus.send(new ErrorState(ErrorType.SERVICE_ERROR, this.mContext.getString(R.string.connection_failed), requestPath));
                    Log.e(TAG, "check connectivity intercept: ",ex );
                    throw new IOException("IO Exception occurred");
                }
                return originalResponse;
            }
        }
    }
   ==================== 
    public class HTTPResponseCodeCheckInterceptor implements Interceptor {

    private RxEventBus eventBus;

     public HTTPResponseCodeCheckInterceptor(RxEventBus eventBus) {
            this.eventBus = eventBus;
        }


     @Override
        public Response intercept(Chain chain) throws IOException {

        if (!responseSuccess) {
                    if (code == HttpStatus.MOVED_TEMPORARILY.value()) {
                        eventBus.send(new ErrorState(ErrorType.STEP_UP_AUTHENTICATION,requestPath,rSecureCode));
                    } else if (code == HttpStatus.INTERNAL_SERVER_ERROR.value()) { // Error code 500
                            eventBus.send(new ErrorState(ErrorType.SERVICE_ERROR, getAPIError(responseStringOrig),requestPath));
                    } else if (code == HttpStatus.SERVICE_UNAVAILABLE.value()) {
                         eventBus.send(new ErrorState(ErrorType.SERVICE_UNAVAILABLE, getOutageMessage(responseStringOrig),requestPath));
                    } else {
                        eventBus.send(new ErrorState(ErrorType.SERVICE_ERROR,new APIErrorResponse(500, "Internal Server Error"),requestPath));
                    }
                }
        }
    }
===================    
    public class RxEventBus {

        private PublishSubject<ErrorState> bus = PublishSubject.create();

        private RxEventBus() {
        }

        private static class SingletonHolder {
            private static final RxEventBus INSTANCE = new RxEventBus();
        }

        public static RxEventBus getBus() {
            return RxEventBus.SingletonHolder.INSTANCE;
        }


        public void send(ErrorState o) {
            bus.onNext(o);
        }

        public Observable<ErrorState> toObserverable() {
            return bus;
        }

        public boolean hasObservers() {
           return bus.hasObservers();
        }

        public  static void register(Object subscriber) {
            //bus.register(subscriber);
        }

        public  static void unregister(Object subscriber) {
          //  bus.unregister(subscriber);
        }
    }

 =====================   
    public class BasePresenter<V extends MVPView> implements MVPPresenter<V> {

      private final CompositeDisposable mCompositeDisposable;

       @Override
        public void subscribe() {
            initRxBus();
        }

        @Override
        public void unsubscribe() {
            RxUtil.unsubscribe(mCompositeDisposable);
        }


     public void addSubscription(Disposable disposable){
            if(mCompositeDisposable != null){
                mCompositeDisposable.add(disposable);
                Log.d(TAG, "addSubscription: "+mCompositeDisposable.size());
            }
        }

    private void initRxBus() {
            addSubscription(EPGApplication.getAppInstance().eventBus()
                    .toObserverable()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<ErrorState>() {
                        @Override
                        public void accept(ErrorState errorState) throws Exception {
                            if (mMvpView != null) {
                                mMvpView.hideLoadingIndicator();
                                if (ErrorType.STEP_UP_AUTHENTICATION == errorState.getType()) {
                                    mMvpView.showStepUpAuthentication(errorState.getSecureRequestCode());
                                } else if (ErrorType.SERVICE_ERROR == errorState.getType()) {
                                    mMvpView.showServiceError(((APIErrorResponse) errorState.getErrorData()).getErrorMessage());
                                } else if (ErrorType.SERVICE_UNAVAILABLE == errorState.getType()) {
                                    mMvpView.showServiceUnavailable(((OutageBody) errorState.getErrorData()));
                                } else if (ErrorType.UNAUTHORIZED == errorState.getType()) {
                                    mMvpView.sessionTokenExpiredRequestLogin();
                                } else if (ErrorType.GEO_BLOCK == errorState.getType()) {
                                    mMvpView.showGeoBlockErrorMessage();
                                } else if (ErrorType.SESSION_EXPIRED == errorState.getType()) {
                                    mMvpView.sessionTokenExpiredRequestLogin();
                                }else if (ErrorType.NO_INTERNET_CONNECTION == errorState.getType()) {
                                    mMvpView.showNoNetworkConnectivityMessage();
                                    mMvpView.showServiceError(resourceProvider.getString(R.string.no_network_connection));
                                }
                            }
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.e(TAG, "base excpetion: ", throwable);
                        }
                    }));
        }
    }
    }
  ==================  
    public class ProfilePresenter<V extends ProfileContract.View> extends BasePresenter<V>
            implements ProfileContract.Presenter<V> {



         public ProfilePresenter(ProfileContract.View view, CustomerRepository repository) {
                super();
                this.repository = repository;
            }


             private void updateCustomerAccountDetails(JSONObject payload) {
                    getMvpView().showLoadingIndicator();
                    addSubscription(repository.updateCustomerDetails(sharedPreferencesRepository.isStepUpAuthRequired(), AppConfig.CUSTOMER_ACCOUNT_HOLDER_UPDATE
                            , payload)
                            .compose(RxUtil.applySingleSchedulers())
                            .subscribeWith(new DisposableSingleObserver<BaseServerResponse>() {
                                @Override
                                public void onSuccess(BaseServerResponse response) {
                                    if (!isViewAttached()) {
                                        return;
                                    }
                                    getMvpView().hideLoadingIndicator();
                                    getMvpView().onSuccessProfileInfoUpdate();
                                }


                                @Override
                                public void onError(Throwable throwable) {

                                    if (!isViewAttached()) {
                                        return;
                                    }

                                    if (throwable instanceof NoConnectivityException) {
                                        getMvpView().showNoNetworkConnectivityMessage();
                                    }
                                    getMvpView().hideLoadingIndicator();
                                }
                            }));
                }
      }
like image 687
Sam Avatar asked Mar 02 '18 03:03

Sam


3 Answers

    Observable.just(new Object())
            .subscribeOn(Schedulers.io())
            .subscribe(new DefaultObserver<Object>() {
                @Override
                public void onNext(Object o) {
                }

                @Override
                public void onError(Throwable e) {

                    if(e instanceof HttpException) {
                        HttpException httpException = (HttpException) e;

                        if(httpException.code() == 400)
                            Log.d(TAG, "onError: BAD REQUEST");
                        else if(httpException.code() == 401)
                            Log.d(TAG, "onError: NOT AUTHORIZED");
                        else if(httpException.code() == 403)
                            Log.d(TAG, "onError: FORBIDDEN");
                        else if(httpException.code() == 404)
                            Log.d(TAG, "onError: NOT FOUND");
                        else if(httpException.code() == 500)
                            Log.d(TAG, "onError: INTERNAL SERVER ERROR");
                        else if(httpException.code() == 502)
                            Log.d(TAG, "onError: BAD GATEWAY");

                    }
                }

                @Override
                public void onComplete() {

                }
            });

Please make a note that, all responses with code 2xx will be consumed in onNext and codes starts with 4xx and 5xx will be consumed in onError.

FYI. ResponseCodeCheckInterceptor is not required in this case. just give a try without custom interceptor and this should do the trick.

UPDATE

Custom Observer

public abstract class CustomObserver extends DefaultObserver implements Observer{

@Override
public void onNext(Object o) {

}

@Override
public void onError(Throwable e) {
    if(e instanceof HttpException) {
        HttpException httpException = (HttpException) e;

        if(httpException.code() == 400)
            onBadRequest(e);
        else if(httpException.code() == 401)
            onNotAuthorized(e);
        else if(httpException.code() == 502)
            onBadGateway(e);

    }
}

public abstract void onNotAuthorized(Throwable e);

public abstract void onBadGateway(Throwable e);

public abstract void onBadRequest(Throwable e);

@Override
public void onComplete() {

}

}

Implementation

Observable.just(new Object())
            .subscribeOn(Schedulers.io())
            .subscribe(new CustomObserver() {
                @Override
                public void onNext(Object o) {
                    super.onNext(o);
                }

                @Override
                public void onError(Throwable e) {
                    super.onError(e);
                }

                @Override
                public void onNotAuthorized(Throwable e) {

                }

                @Override
                public void onBadGateway(Throwable e) {

                }

                @Override
                public void onBadRequest(Throwable e) {

                }

                @Override
                public void onComplete() {
                    super.onComplete();
                }
            });
like image 161
Aks4125 Avatar answered Nov 14 '22 22:11

Aks4125


one approach i've used in the past is to share a Subject used for communicating error conditions.

  • on the producer side, hold a reference to it a Subject type (Publish/Behavior/Replay/etc) so as to invoke onNext() with the next error the application has incurred. in this case the producer would be the ResponseCodeCheckInterceptor instance. rather than throwing the various exceptions, this instance would instead emit an ErrorState describing the error that just occurred. (assume ErrorState to be a custom type that carries just enough information about an error condition for consumers to decide how to react, e.g. update the UI, clean-up resources, etc).

  • on the consumer side, hold a reference to the shared Subject as an Observable<ErrorState>. as you seem to be doing MVP, the Presenter would likely be one of your consumers.

(dependency injection is a good way to go about sharing the Subject instance).

hope that helps!

Update with some rough sample code...

// this is descriptive way to identify unique error types you may care about
enum ErrorType {
    Unauthorized,
    ServiceUnavailable,
    ServiceError
}

// this class should bundle together enough information about an 
// error that has occurred so consumers can decide how to respond
// e.g. is this a "fatal" error, or can the associated operation be 
// retried?
class ErrorState {
    public final ErrorType type;
    public final boolean fatal;

    ErrorState(ErrorType type, boolean fatal) {
        this.type  = type;
        this.fatal = fatal;
    }
}

// the Interceptor creates new instances of ErrorState and pushes 
// them through the Subject to notify downstream subscribers
class ResponseCodeCheckInterceptor implements Interceptor {
    private final Subject<ErrorState> errorStateSubject;

    ResponseCodeCheckInterceptor(Subject<ErrorState> errorStateSubject) {
        this.errorStateSubject = errorStateSubject;
    }

    @Override
    public Response intercept(@NonNull Chain chain) throws IOException {
        final Response response = chain.proceed(chain.request());

        if(response.code() == HttpStatus.UNAUTHORIZED.value()) {
            errorStateSubject.onNext(new ErrorState(ErrorType.Unauthorized, false));
        } else if (response.code() == HttpStatus.INTERNAL_SERVER_ERROR.value()) {
            errorStateSubject.onNext(new ErrorState(ErrorType.ServiceError, true));
        } else if (response.code() == HttpStatus.SERVICE_UNAVAILABLE.value()) {
            errorStateSubject.onNext(new ErrorState(ErrorType.ServiceUnavailable, false));
        }

        return response;
    }
}

// the Presenter is created with a reference to the same Subject, but 
// retains a reference to it as an Observable. the Consumer instance 
// supplied as the onNext() handler is where you'd put your logic to 
// handle ErrorStates. also, the Presenter should be lifecycle-aware
// so as to create and dispose of subscriptions at the appropriate
// times.
class Presenter {
    private final Observable<ErrorState> errorStateStream;

    private Disposable errorStateSubscription;

    class Presenter(Observable<ErrorState> errorStateStream) {
        this.errorStateStream = errorStateStream
    }

    public void onStart() {
        errorStateSubscription = errorStateStream.subscribe(
                next  -> { 
                    /* Invoke views/etc */ 
                },
                error -> { 
                    /* handle stream error during */ 
                }
        );
    }

    public void onStop() {
        if(errorStateSubscription != null) {
            errorStateSubscription.dispose();
        }
    }
}
like image 24
homerman Avatar answered Nov 14 '22 23:11

homerman


Kotlin way

If you use Kotlin and MVVM I suggest another simple way

You can use Kotlin extensions like below:

fun <T> Single<T>.handelNetworkError() =
    onErrorResumeNext { e ->
        when (e) {
            is OfflineException -> return@onErrorResumeNext Single.error(Exception("check your internet connection "))
            is SocketTimeoutException -> return@onErrorResumeNext Single.error(Exception("server not fount"))
            is retrofit2.HttpException -> {
                val responseBody = e.response().errorBody()
///if you want more custom error:
 /* when(e.response().code()){
                    500 ->  return@onErrorResumeNext Single.error( customException500(responseBody?.run { getErrorMessage(responseBody) }))
                    402 ->  return@onErrorResumeNext Single.error(customException402(responseBody?.run { getErrorMessage(responseBody) }))
            .... ->    }  */

                return@onErrorResumeNext Single.error(Exception(responseBody?.run { getErrorMessage(responseBody) }))
            }
            is IOException -> return@onErrorResumeNext Single.error(Exception("Network error"))
            else -> return@onErrorResumeNext Single.error(e)
        }
    }

fun <T> Observable<T>.handelNetworkError() =
    onErrorResumeNext { e : Throwable ->
        when (e) {
            is OfflineException -> return@onErrorResumeNext Observable.error(Exception("check your internet connection"))
            is SocketTimeoutException -> return@onErrorResumeNext Observable.error(Exception("server not fount"))
            is retrofit2.HttpException -> {
                val responseBody = e.response().errorBody()

                return@onErrorResumeNext Observable.error(Exception(responseBody?.run { getErrorMessage(responseBody) }))
            }
            is IOException -> return@onErrorResumeNext Observable.error(Exception("Network error"))
            else -> return@onErrorResumeNext Observable.error(e)
        }
    }


fun getErrorMessage(responseBody: ResponseBody): String? {
    return try {
        val jsonObject = JSONObject(responseBody.string())
        jsonObject.getString("message")
    } catch (e: Exception) {
        e.message
    }

}

class OfflineException : IOException() {
    override val message: String?
        get() = "no internet!"
}

Usage:

api.getUserList().handelNetworkError().subscribe { }

like image 30
amin mahmoudi Avatar answered Nov 14 '22 23:11

amin mahmoudi