Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Incorrect understanding of buffer in RxJava

I'm trying to get familiar with RxJava. Here's the use case that i'm trying to achieve:

I have a button on my screen and i'm trying to collect the number of taps. So if a user taps the button, a click is registered and a log is produced. Now if the user clicks the button twice, then it registers both clicks, collects them and outputs 2 instead of 1.

Essentially, i'm trying to accumulate the number of clicks, over a span of time and then spit out the final results. I'm guessing "buffer" is the method that I need to be using. I whipped up a quick example in Android (code follows), but the buffer method doesn't seem to be as simple as just collect all event inputs and spit out a collection.

public class DemoFragment
    extends Fragment {

    private int _tapCount = 0;
    private Observable<List<Integer>> _bufferedObservable;
    private Observer<List<Integer>> _observer;

    @Override
    public void onActivityCreated(@Nullable Bundle savedInstanceState) {
        super.onActivityCreated(savedInstanceState);
        _setupLogger();

        _bufferedObservable = _getBufferedObservable();
        _observer = _getObserver();
    }


    // the library butterknife allows this
    @OnClick(R.id.btn_start_operation)
    public void onButtonTapped() {
        _log("GOT A TAP");
        _bufferedObservable.subscribeOn(Schedulers.io())
                         .observeOn(AndroidSchedulers.mainThread())
                         .subscribe(_observer);
    }

    private Observable<List<Integer>> _getBufferedObservable() {
        return Observable.create(new Observable.OnSubscribe<Integer>() {


            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);   // send one tap here
            }

        }).buffer(2, TimeUnit.SECONDS); // collect all taps in the last 2s
    }

    private Observer<List<Integer>> _getObserver() {
        return new Observer<List<Integer>>() {


            @Override
            public void onCompleted() {
                _log(String.format("%d taps", _tapCount));
                _tapCount = 0; // reset tap count
            }

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onNext(List<Integer> integers) {
                if (integers.size() > 0) {
                    for (int i : integers) {
                        _tapCount += i;
                    }
                    onCompleted();
                } else {
                    _log("No taps received");
                }
            }
        };
    }

    // ... other method that help wiring up the example (irrelevant to RxJava)
}

Can anyone help me understand the misconception in my understanding here?

Problem 1: I'm expecting _getObserver()'s onNext to send me a list with the accumulated number of taps. So if the button was hit 5 times, then i'm expecting a List with 5 items, each of which have the value "1". With the existing code, i'm always getting an empty list.

Problem 2: I basically do a console log if no events were received by checking List<Integer> integers size. If list is not empty, i throw in a console log, saying "not taps received". It appears that the Observable NEVER stops. It's almost like a timer, where it constantly keeps going on and on, even when no button tap was registered. Is there way to stop the Observable, if no event was registered in the last 10s?

Problem 3: The number of emits seem to almost exponentially increase. It's almost like its gathering the button empty taps from all previous times.

like image 558
Kaushik Gopal Avatar asked Jan 11 '23 04:01

Kaushik Gopal


1 Answers

Here is a code showing how I would do it (assuming your button's id is R.id.rx_button):

private Subscription mSubscription;

@Override
protected void onResume() {
    super.onResume();
    mSubscription = Observable.create(new OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            findViewById(R.id.rx_button).setOnClickListener(new OnClickListener() {
                @Override
                public void onClick(View v) {
                    subscriber.onNext(1);
                }
            });
        }
    }).buffer(2, TimeUnit.SECONDS)
            .subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
                    Log.i("TAG", String.valueOf(integers.size()));
                }
            });
}

@Override
protected void onPause() {
    super.onPause();
    mSubscription.unsubscribe();
}

Simply put, just close the OnClickListener implementation over the call method, so you will be able to use the subscriber object inside it.

The onResume will look even better with lambdas (take a look on the Retrolambda project):

@Override
protected void onResume() {
    super.onResume();
    mSubscription = Observable.create((Subscriber<? super Integer> subscriber) ->
            findViewById(R.id.rx_button).setOnClickListener(view ->
                    subscriber.onNext(1))).buffer(2, TimeUnit.SECONDS)
            .subscribe(integers -> Log.i("TAG", String.valueOf(integers.size())));
}
like image 158
tomrozb Avatar answered Jan 20 '23 18:01

tomrozb