Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I have a side-effect subscription that does not alter the stream?

Tags:

rx-java

Scenario:

I have a list of data (transactions for a business). This data is considered stale if the following two conditions are met:

  • The user (UI/View) is subscribed to the data source (an rx.Observable<List<Transaction>>)
  • >= 5 minutes have passed since the last network synchronization. This value is serialized and observed from the database.

Note: if the UI/view is NOT subscribed to the data source, and >= 5 five minutes has passed, I do NOT want to sync fresh data (because no one is listening)

Some details for the code sample below:

  • data.observe() returns rx.Observable<List<Transaction>>
  • void syncIfLast5Minutes() will imperatively check if there has been a sync in the last five minutes - if not, then a new network request will be executed to fetch fresh data

Now, I could easily do something like:

data.observe()
    .doOnSubscribe(transactions -> syncIfLast5Minutes()

But this would only check if the data is stale on the initial subscription by the UI. If the UI is still subscribed to after 5 minutes, an automatic refresh won't trigger here (but I want it do).

I am looking for a side-effect operator that will

  • not affect the original stream
  • subscribing starts a timer that will monitor when the data is stale (>= 5 minutes has passed)
  • unsusbscribing cancels this timer

Is there an idiomatic way in RxJava to achieve this reactively?

like image 929
ZakTaccardi Avatar asked Nov 09 '22 05:11

ZakTaccardi


1 Answers

You can achieve what you want by merging your stream with an Observable.interval that performs your side-effect (refresh of data) and has ignoreElements applied to it.

This is a working example:

import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.subjects.PublishSubject;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        PublishSubject<String> data = PublishSubject.create();
        //refresh data every second for this demo
        //for your use case it's every 5 minutes
        data.mergeWith(Observable.interval(1, TimeUnit.SECONDS) 
                // cause timed side-effect that updates data stream
                .doOnNext(n -> data.onNext(n + "")) 
                .ignoreElements()
                .cast(String.class)) 
            .doOnNext(System.out::println) 
            .toBlocking() 
            .subscribe();
    }

}

If multiple subscribers are using the stream you might want to look at .share so only one refresh action happens every 5 mins.

like image 155
Dave Moten Avatar answered Nov 16 '22 16:11

Dave Moten