Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RXJava - make a pausable observable (with buffer and window for example)

I want to create observables that do following:

  • buffer all items, while they are paused
  • immediately emit items, while they are not paused
  • the pause/resume trigger must come from another observable
  • it must be save to be used by observables that do not run on the main thread and it must be save change the paused/resumed state from the main thread

I want to use a BehaviorSubject<Boolean> as trigger and bind this trigger to an activity's onResume and onPause event. (Code example appended)

Question

I've setup something, but it is not working as intended. I use it like following:

Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

Currently the problem is, that Variant 1 should work fine, but sometimes, the events are just not emitted - the valve is not emitting, until the valve everything is working (may be a threading problem...)! Solution 2 is much simplier and seems to work, but I'm not sure if it is really better, I don't think so. I'm actually not sure, why solution one is failing sometimes so I'm not sure if solution 2 solves the (currently for me unknown) problem...

Can someone tell me what could be the problem or if the simple solution should work reliably? Or show me a reliable solution?

Code

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

RXPauser functions

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
    return observable -> pauser(observable, pauser);
}

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
    // this observable buffers all items that are emitted while emission is paused
    Observable<T> sharedSource = source.publish().refCount();
    Observable<T> queue = sharedSource
            .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
            .flatMap(l -> Observable.from(l))
            .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));

    // this observable emits all items that are emitted while emission is not paused
    Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean ->  pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
            .switchMap(tObservable -> tObservable)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));

    // combine both observables
    return queue.mergeWith(window)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}

Activity

public class BaseActivity extends AppCompatActivity {

    private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);

    public BaseActivity(Bundle savedInstanceState)
    {
        super(args);
        final Class<?> clazz = this.getClass();
        pauser
                .doOnUnsubscribe(() -> {
                    L.d(clazz, "Pauser unsubscribed!");
                })
                .subscribe(aBoolean -> {
                    L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
                });
    }

    public PublishSubject<Boolean> getPauser()
    {
        return pauser;
    }

    @Override
    protected void onResume()
    {
        super.onResume();
        pauser.onNext(true);
    }

    @Override
    protected void onPause()
    {
        pauser.onNext(false);
        super.onPause();
    }
}
like image 592
prom85 Avatar asked Sep 25 '16 09:09

prom85


2 Answers

You can actually use .buffer() operator passing it observable, defining when to stop buffering, sample from book:

Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
    .buffer(Observable.interval(250, TimeUnit.MILLISECONDS))
    .subscribe(System.out::println);

from chapter 5, 'Taming the sequence': https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md

You can use PublishSubject as Observable to feed it elements in your custom operator. Every time you need to start buffering, create instance by Observable.defer(() -> createBufferingValve())

like image 159
Alex Shutov Avatar answered Nov 10 '22 07:11

Alex Shutov


I made similar thing for logging events.
Subject collects some events, and one time in 10 seconds pushes them to server.

The main idea is, for example you have class Event.

public class Event {

    public String jsonData;

    public String getJsonData() {
        return jsonData;
    }

    public Event setJsonData(String jsonData) {
        this.jsonData = jsonData;
        return this;
    }
}

You should create queue for events:

private PublishSubject<Event> eventQueue = PublishSubject.create();

It can be BehaviorSubject, it doesn't matter

Then you should create the logic, which will handle pushing events to the server:

    eventObservable = eventQueue.asObservable()
            .buffer(10, TimeUnit.SECONDS)   //flush events every 10 seconds
            .toList()
            .doOnNext(new Action1<List<Event>>() {
                @Override
                public void call(List<Event> events) {
                    apiClient.pushEvents(events);     //push your event
                }
            })
            .onErrorResumeNext(new Func1<Throwable, Observable<List<Event>>>() {
                @Override
                public Observable<List<Event>> call(Throwable throwable) {
                    return null;    //make sure, that on error will be never called
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io());

Then you should subscribe to it, and retain subscription, until you don't need it:

eventSubscription = eventObservable.subscribe()

Home this helps

like image 43
Aleksandr Avatar answered Nov 10 '22 08:11

Aleksandr