I'm having trouble finding an example of how to make a custom operator with RxJava 2. I've considered a few approaches:
Observable.create, and then flatMaping on it from the source observable. I can get this working, but it doesn't quite feel right. I end up creating a static function which I provide the source Observable, and then flatMap on the source. In the OnSubscribe, I  then instantiate an object that I pass the emitter to, which handles and manages the Observable / Emitter (as it's not trivial, and I want everything as encapsulated as possible).ObservableOperator and providing it to Observable.lift. I can't find any examples of this for RxJava 2. I had to debug into my own example to make sure my understanding of upstream and downstream were correct. Because I can't find any examples or documentation on this for RxJava 2 I'm a little worried I might accidentally do something I'm not supposed to.Observable type. This seems to be how the underlying operators work, many of which extend AbstractObservableWithUpstream. There is a lot going on here though, and it seems easy to miss something or do something I shouldn't. I'm not sure if I'm supposed to take an approach like this or not. I stepped myself through the mental process, and it seems like it can get hairy pretty quickly.I'm going to proceed forward with option #2, but thought it worthwhile to ask what the supported method for doing this was in RxJava2 and also find out if there was any documentation or examples for this.
Writing operators is not recommended for beginners and many desired flow patterns can be achieved via existing operators.
Have you looked at RxJava's wiki about writing operators for 2.x? I suggest reading it from top to bottom.
create() is possible but most people use it to emit the elements of a List with a for-each loop, not recognizing that Flowable.fromIterable does that.lift() themselves. If you want to avoid some boilerplate with option 3. then you may try this route.AbstractObservableWithUpstream is a small convenience and not necessary for external implementors.This may help you. I implement operator RxJava2 to handle APiError. I used lift operator.
See the example.
  public final class ApiClient implements ApiClientInterface {
    ...
      @NonNull
      @Override
      public Observable<ActivateResponse> activate(String email, EmailData emailLinkData) {
          return myApiService.activate(email, emailData)
                  .lift(getApiErrorTransformer())
                  .subscribeOn(Schedulers.io());
      }
      private <T>ApiErrorOperator<T> getApiErrorTransformer() {
          return new ApiErrorOperator<>(gson, networkService);
      }
  }
And then you can find custom operator
    public final class ApiErrorOperator<T> implements ObservableOperator<T, T> {
        private static final String TAG = "ApiErrorOperator";
        private final Gson gson;
        private final NetworkService networkService;
        public ApiErrorOperator(@NonNull Gson gson, @NonNull NetworkService networkService) {
            this.gson = gson;
            this.networkService = networkService;
        }
        @Override
        public Observer<? super T> apply(Observer<? super T> observer) throws Exception {
            return new Observer<T>() {
                @Override
                public void onSubscribe(Disposable d) {
                    observer.onSubscribe(d);
                }
                @Override
                public void onNext(T value) {
                    observer.onNext(value);
                }
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError", e);
                if (e instanceof HttpException) {
                        try {
                            HttpException error = (HttpException) e;
                            Response response = error.response();
                            String errorBody = response.errorBody().string();
                            ErrorResponse errorResponse = gson.fromJson(errorBody.trim(), ErrorResponse.class);
                            ApiException exception = new ApiException(errorResponse, response);
                            observer.onError(exception);
                        } catch (IOException exception) {
                            observer.onError(exception);
                        }
                    } else if (!networkService.isNetworkAvailable()) {
                        observer.onError(new NetworkException(ErrorResponse.builder()
                                .setErrorCode("")
                                .setDescription("No Network Connection Error")
                                .build()));
                    } else {
                        observer.onError(e);
                    }
                }
                @Override
                public void onComplete() {
                    observer.onComplete();
                }
            };
        }
    }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With