I want to create observables that do following:
I want to use a BehaviorSubject<Boolean>
as trigger and bind this trigger to an activity's onResume
and onPause
event. (Code example appended)
Question
I've setup something, but it is not working as intended. I use it like following:
Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
Currently the problem is, that Variant 1 should work fine, but sometimes, the events are just not emitted - the valve is not emitting, until the valve everything is working (may be a threading problem...)! Solution 2 is much simplier and seems to work, but I'm not sure if it is really better, I don't think so. I'm actually not sure, why solution one is failing sometimes so I'm not sure if solution 2 solves the (currently for me unknown) problem...
Can someone tell me what could be the problem or if the simple solution should work reliably? Or show me a reliable solution?
Code
RxValue
https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08
RXPauser functions
public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
return observable -> pauser(observable, pauser);
}
private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
// this observable buffers all items that are emitted while emission is paused
Observable<T> sharedSource = source.publish().refCount();
Observable<T> queue = sharedSource
.buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
.flatMap(l -> Observable.from(l))
.doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));
// this observable emits all items that are emitted while emission is not paused
Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
.switchMap(tObservable -> tObservable)
.doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));
// combine both observables
return queue.mergeWith(window)
.doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}
Activity
public class BaseActivity extends AppCompatActivity {
private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);
public BaseActivity(Bundle savedInstanceState)
{
super(args);
final Class<?> clazz = this.getClass();
pauser
.doOnUnsubscribe(() -> {
L.d(clazz, "Pauser unsubscribed!");
})
.subscribe(aBoolean -> {
L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
});
}
public PublishSubject<Boolean> getPauser()
{
return pauser;
}
@Override
protected void onResume()
{
super.onResume();
pauser.onNext(true);
}
@Override
protected void onPause()
{
pauser.onNext(false);
super.onPause();
}
}
You can actually use .buffer()
operator passing it observable, defining when to stop buffering, sample from book:
Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
.buffer(Observable.interval(250, TimeUnit.MILLISECONDS))
.subscribe(System.out::println);
from chapter 5, 'Taming the sequence': https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md
You can use PublishSubject
as Observable
to feed it elements in your custom operator. Every time you need to start buffering, create instance by Observable.defer(() -> createBufferingValve())
I made similar thing for logging events.
Subject collects some events, and one time in 10 seconds pushes them to server.
The main idea is, for example you have class Event
.
public class Event {
public String jsonData;
public String getJsonData() {
return jsonData;
}
public Event setJsonData(String jsonData) {
this.jsonData = jsonData;
return this;
}
}
You should create queue for events:
private PublishSubject<Event> eventQueue = PublishSubject.create();
It can be BehaviorSubject
, it doesn't matter
Then you should create the logic, which will handle pushing events to the server:
eventObservable = eventQueue.asObservable()
.buffer(10, TimeUnit.SECONDS) //flush events every 10 seconds
.toList()
.doOnNext(new Action1<List<Event>>() {
@Override
public void call(List<Event> events) {
apiClient.pushEvents(events); //push your event
}
})
.onErrorResumeNext(new Func1<Throwable, Observable<List<Event>>>() {
@Override
public Observable<List<Event>> call(Throwable throwable) {
return null; //make sure, that on error will be never called
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io());
Then you should subscribe to it, and retain subscription, until you don't need it:
eventSubscription = eventObservable.subscribe()
Home this helps
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With