Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava and Cached Data

Tags:

I'm still fairly new to RxJava and I'm using it in an Android application. I've read a metric ton on the subject but still feel like I'm missing something.

I have the following scenario:

I have data stored in the system which is accessed via various service connections (AIDL) and I need to retrieve data from this system (1-n number of async calls can happen). Rx has helped me a ton in simplifying this code. However, this entire process tends to take a few seconds (upwards of 5 seconds+) therefore I need to cache this data to speed up the native app.

The requirements at this point are:

  1. Initial subscription, the cache will be empty, therefore we have to wait the required time to load. No big deal. After that the data should be cached.

  2. Subsequent loads should pull the data from cache, but then the data should be reloaded and the disk cache should be behind the scenes.

The Problem: I have two Observables - A and B. A contains the nested Observables that pull data from the local services (tons going on here). B is much simpler. B simply contains the code to pull the data from disk cache.

Need to solve: a) Return a cached item (if cached) and continue to re-load the disk cache. b) Cache is empty, load the data from system, cache it and return it. Subsequent calls go back to "a".

I've had a few folks recommend a few operations such as flatmap, merge and even subjects but for some reason I'm having trouble connecting the dots.

How can I do this?

like image 705
Donn Felker Avatar asked Nov 14 '14 21:11

Donn Felker


People also ask

How does cache work android?

By caching your app's data and resources, you create a local copy of the information that your app needs to reference. If your app needs to access the same piece of information multiple times over a short time period, you need to download it into the cache only once.

What is RxJava used for?

RxJava is a Java library that enables Functional Reactive Programming in Android development. It raises the level of abstraction around threading in order to simplify the implementation of complex concurrent behavior.

What is RxJava and RxAndroid?

RxAndroid is a RxJava for Android extension that is only used in Android applications. RxAndroid added the Android-required Main Thread. We will need the Looper and Handler for Main Thread execution in order to work with multithreading in Android. Note: AndroidSchedulers are provided by RxAndroid.

What is disk cache in android?

This cache take up your application memory so avoid it for storing huge data. Memory cache gets destroyed once app goes background and gets killed by system to save up resources.


1 Answers

Here are a couple options on how to do this. I'll try to explain them as best I can as I go along. This is napkin-code, and I'm using Java8-style lambda syntax because I'm lazy and it's prettier. :)

  1. A subject, like AsyncSubject, would be perfect if you could keep these as instance states in memory, although it sounds like you need to store these to disk. However, I think this approach is worth mentioning just in case you are able to. Also, it's just a nifty technique to know. AsyncSubject is an Observable that only emits the LAST value published to it (A Subject is both an Observer and an Observable), and will only start emitting after onCompleted has been called. Thus, anything that subscribes after that complete will receive the next value.

    In this case, you could have (in an application class or other singleton instance at the app level):

    public class MyApplication extends Application {         private final AsyncSubject<Foo> foo = AsyncSubject.create();      /** Asynchronously gets foo and stores it in the subject. */     public void fetchFooAsync() {         // Gets the observable that does all the heavy lifting.         // It should emit one item and then complete.         FooHelper.getTheFooObservable().subscribe(foo);     }      /** Provides the foo for any consumers who need a foo. */     public Observable<Foo> getFoo() {         return foo;     }  } 
  2. Deferring the Observable. Observable.defer lets you wait to create an Observable until it is subscribed to. You can use this to allow the disk cache fetch to run in the background, and then return the cached version or, if not in cache, make the real deal.

    This version assumes that your getter code, both cache fetch and non- catch creation, are blocking calls, not observables, and the defer does work in the background. For example:

    public Observable<Foo> getFoo() {     Observable.defer(() -> {         if (FooHelper.isFooCached()) {             return Observable.just(FooHelper.getFooFromCacheBlocking());         }         return Observable.just(FooHelper.createNewFooBlocking());     }).subscribeOn(Schedulers.io()); } 
  3. Use concatWith and take. Here we assume our method to get the Foo from the disk cache either emits a single item and completes or else just completes without emitting, if empty.

    public Observable<Foo> getFoo() {     return FooHelper.getCachedFooObservable()             .concatWith(FooHelper.getRealFooObservable())             .take(1); } 

    That method should only attempt to fetch the real deal if the cached observable finished empty.

  4. Use amb or ambWith. This is probably one the craziest solutions, but fun to point out. amb basically takes a couple (or more with the overloads) observables and waits until one of them emits an item, then it completely discards the other observable and just takes the one that won the race. The only way this would be useful is if it's possible for the computation step of creating a new Foo to be faster than fetching it from disk. In that case, you could do something like this:

    public Observable<Foo> getFoo() {     return Observable.amb(             FooHelper.getCachedFooObservable(),             FooHelper.getRealFooObservable()); } 

I kinda prefer Option 3. As far as actually caching it, you could have something like this at one of the entry points (preferably before we're gonna need the Foo, since as you said this is a long-running operation) Later consumers should get the cached version as long as it has finished writing. Using an AsyncSubject here may help as well, to make sure we don't trigger the work multiple times while waiting for it to be written. The consumers would only get the completed result, but again, that only works if it can be reasonably kept around in memory.

if (!FooHelper.isFooCached()) {     getFoo()         .subscribeOn(Schedulers.io())         .subscribe((foo) -> FooHelper.cacheTheFoo(foo)); } 

Note that, you should either keep around a single thread scheduler meant for disk writing (and reading) and use .observeOn(foo) after .subscribeOn(...), or otherwise synchronize access to the disk cache to prevent concurrency issues.

like image 80
lopar Avatar answered Oct 28 '22 21:10

lopar