Scenario:
I have a list of data (transactions for a business). This data is considered stale if the following two conditions are met:
rx.Observable<List<Transaction>>
)Note: if the UI/view is NOT subscribed to the data source, and >= 5 five minutes has passed, I do NOT want to sync fresh data (because no one is listening)
Some details for the code sample below:
data.observe()
returns rx.Observable<List<Transaction>>
void syncIfLast5Minutes()
will imperatively check if there has been a sync in the last five minutes - if not, then a new network request will be executed to fetch fresh dataNow, I could easily do something like:
data.observe()
.doOnSubscribe(transactions -> syncIfLast5Minutes()
But this would only check if the data is stale on the initial subscription by the UI. If the UI is still subscribed to after 5 minutes, an automatic refresh won't trigger here (but I want it do).
I am looking for a side-effect operator that will
Is there an idiomatic way in RxJava to achieve this reactively?
You can achieve what you want by merging your stream with an Observable.interval
that performs your side-effect (refresh of data) and has ignoreElements
applied to it.
This is a working example:
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.subjects.PublishSubject;
public class Main {
public static void main(String[] args) throws InterruptedException {
PublishSubject<String> data = PublishSubject.create();
//refresh data every second for this demo
//for your use case it's every 5 minutes
data.mergeWith(Observable.interval(1, TimeUnit.SECONDS)
// cause timed side-effect that updates data stream
.doOnNext(n -> data.onNext(n + ""))
.ignoreElements()
.cast(String.class))
.doOnNext(System.out::println)
.toBlocking()
.subscribe();
}
}
If multiple subscribers are using the stream you might want to look at .share
so only one refresh action happens every 5 mins.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With