Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using RxJava to chain a series of operations - where to go next?

Scenario

I have built an API for my app that sits behind a gateway with request throttling. Before I built the API my app coordinated requests itself and thus could start many many requests in milliseconds to synchronize data for the app across the 9 providers being used to fetch data. Now, this logic has been pushed up into my API adapter layer I need to think about how I can control the number of requests per second to avoid hitting my own rate limits. Increasing the rate limit is not an option since it would require a tier bump in the gateway provider that I am not willing to pay for.

I have started using RxJava

In a bit to upskill in this strong movement within the Java community, I have opted to use RxJava, together with Retrofit and Retrolamba for the API SDK I have built. This has been largely successful and is operating in live without issue.

My app

Now, my app allows users to save 'spots' that when synchronized retrieve local weather, tide and swell conditions for that area. Each spot uses 4 API resources to get a complete data-set, specifically;

/luna/locations/xtide/{id} - Luna Event detail (read: tide times)
/solar/locations/xtide/{id} - Solar Event detail (read: sunrise/sunset)
/water/locations/{provider}/{id}{?daysData} - Water Event detail (read: swell measures)
/meteo/wwo/weather{?query,daysData} - Meteo Event detail (read: weather data)

The app permits any number of spots, n meaning that with the current code I have 4n requests per spot. For example if I have 10 spots saved and attempt to sync all - I will cause 4*10 = 40 API requests to be fired in around 0.75s!

Self-throttling

I am wanting to use Rx to simplify the process of self-throtteling my API requests. Here is a (hopefully accurate) marble chart of what I am wanting to achieve;

enter image description here Figure 1: Marble chart showing desired stream composition

The SynchronisationService.java code looks a bit like this;

    Observable.zip(
        Observable.from(spots),
        Observable.interval(SYNC_TICK, TimeUnit.MILLISECONDS),
        (obs, timer) -> obs)
        .subscribeOn(scheduler)
        .observeOn(scheduler)
        .unsubscribeOn(scheduler)
        .flatMap(spot -> syncTidePosition.get().buildObservable(spot))
        .subscribe(spotAndTideEvent -> new TideEventSubscriber(
            lunaEventService,
            synchronisationIntentProvider.get(),
            spotAndTideEvent.spot,
            String.format(
                getString(string.tide_error_message),
                spotAndTideEvent.spot.getTidePosition()
            ),
            errorHandlerService,
            localBroadcastManager)
        );

...and the "buildObservable" call looks like this;

Observable<SpotAndTideEventTuple> buildObservable(final Spot spot) {
    return Observable.zip(
        Observable.just(spot),
        lunaEventsProvider
            .listTideTimes(
                spot.getTideOperator(),
                Integer.toString(spot.getTidePosition())
            ),
        SpotAndTideEventTuple::new
    );
  }

...and the lunaEventsProvider.listTideTimes(...) method looks like;

public Observable<List<TideEvent>> listTideTimes(@NonNull final LunaProvider provider,
                                                   @NonNull final String identifier) {
    return getRetrofitServiceImpl(LunaEventsProviderDefinition.class)
        .listTideTimes(provider, identifier)
        .map(TideEventsTemplate::buildModels);
  }

The problem

As an Rx amateur I've read much of the documentation to get this far but upon encountering an error with the code I'm at a loss as to where to go next. Either the subscription isn't causing the emissions to start (as with the snippets shown) or if I tweak things a little I get an unhelpful low-level NPE (rx.Scheduler).

Where should I go from here? Am I on the right track with using Rx for the scenario described? Any help appreciated.

like image 560
BrantApps Avatar asked Nov 09 '22 04:11

BrantApps


1 Answers

Somewhat embarassingly the NPE errors I was seeing were nothing to do with Rx, rather the scheduler I had specified to run the operation on was being injected into the android.app.Service but owing to a slight 'misconfiguration' (omitting the @Inject annotation!) the scheduler variable was null.

Small comfort in knowing that the reason why I missed this was owing to the fact that my Scheduler injection is also qualified meaning that it 'looked' the same as my other declarations at the top of the class;

@Inject @IoScheduler Scheduler scheduler;
@Inject LocalBroadcastManager localBroadcastManager;
@Inject NotificationManager notificationManager;
@Inject SharedPreferences sharedPrefs;

Well, I had fun building those marbles diagrams and pulling apart my understandings on Rx. The current call now coordinates all 4 of the API requests and looks like this;

    Observable.zip(
        Observable.from(spots),
        Observable.interval(SYNC_TICK, TimeUnit.MILLISECONDS),
        (obs, timer) -> obs)
        .subscribeOn(scheduler)
        .observeOn(scheduler)
        .unsubscribeOn(scheduler)
        .flatMap(this::buildObservable)
        .subscribe(
            new EventSubscriber(
                lunaEventService,
                solarService,
                swellService,
                conditionsService,
                synchronisationIntentProvider.get(),
                errorHandlerService,
                localBroadcastManager,
                TRENDING_LENGTH_DAYS
            )
        );

This is part way through the refactor of this service so I expect it to change a bit more, especially when it comes to getting the tests underneath green. Glad I stuck with it, using Rx literally removes ~50 to ~100 lines of code each time I learn a function!

like image 70
BrantApps Avatar answered Nov 14 '22 21:11

BrantApps