I'm trying to implement this workflow with rxJava but i'm sure if i'm misusing or doing stuff wrong.
Here is my full snippet of code.
public class LoginTask extends BaseBackground<LoginResult> {
  private static CachedLoginResult cachedLoginResult = new CachedLoginResult();
  private XMLRPCClient xmlrpcClient;
  private UserCredentialsHolder userCredentialsHolder;
  @Inject
  public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) {
    this.xmlrpcClient = client;
    this.userCredentialsHolder = userCredentialsHolder;
  }
  @Override
  public LoginResult performRequest() throws Exception {
    return UserApi.login(
        xmlrpcClient,
        userCredentialsHolder.getUserName(),
        userCredentialsHolder.getPlainPassword());
  }
  @Override
  public Observable<LoginResult> getObservable() {
    return cachedLoginResult.getObservable()
        .onErrorResumeNext(
            Observable.create(
                ((Observable.OnSubscribe<LoginResult>) subscriber -> {
                  try {
                    if (!subscriber.isUnsubscribed()) {
                      subscriber.onNext(performRequest()); // actually performRequest
                    }
                    subscriber.onCompleted();
                  } catch (Exception e) {
                    subscriber.onError(e);
                  }
                })
            )
                .doOnNext(cachedLoginResult::setLoginResult)
                .retry((attempts, t) -> attempts < 3)
                .doOnError(throwable -> cachedLoginResult.purgeCache())
        );
  }
  private static class CachedLoginResult {
    private LoginResult lr = null;
    private long when = 0;
    private CachedLoginResult() {
    }
    public boolean hasCache() {
      return lr != null && when + TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES) > System.currentTimeMillis();
    }
    public void setLoginResult(LoginResult lr) {
      if (lr != null) {
          this.lr = lr;
          this.when = System.currentTimeMillis();
      }
    }
    public void purgeCache() {
      this.lr = null;
      this.when = 0;
    }
    public Observable<LoginResult> getObservable() {
      return Observable.create(new Observable.OnSubscribe<LoginResult>() {
        @Override
        public void call(Subscriber<? super LoginResult> subscriber) {
          if (!subscriber.isUnsubscribed()) {
            if (hasCache()) {
              subscriber.onNext(lr);
              subscriber.onCompleted();
            } else {
              subscriber.onError(new RuntimeException("No cache"));
            }
          }
        }
      });
    }
  }
}
Since i wan't able to find any similar examples and i started "playing" with rxjava just 1 day ago i'm unsure of my implementation.
Thank you for your time.
I think this code is alright, good job :)
You were right to use Observable.create in your LoginTask because otherwise result of the call could be cached internally, and then retry wouldn't help much...
This is I think however unnecessary for the CachedLoginResult's Observable. Here you can simplify your code by using Observable.justand Observable.error utility methods, something like:
public Observable<LoginResult> getObservable() {
  if (hasCache()) {
      return Observable.just(lr);
  } else {
      return Observable.error(new RuntimeException("No cache"));
  }
}
Note: just stores the value you tell it to emit internally, so that resubscriptions will always produce this value. This is what I hinted above, you shouldn't do Observable.just(performRequest()).retry(3) for example, because the performRequest will only ever be called once.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With