I'm trying to get familiar with RxJava. Here's the use case that i'm trying to achieve:
I have a button on my screen and i'm trying to collect the number of taps. So if a user taps the button, a click is registered and a log is produced. Now if the user clicks the button twice, then it registers both clicks, collects them and outputs 2 instead of 1.
Essentially, i'm trying to accumulate the number of clicks, over a span of time and then spit out the final results. I'm guessing "buffer" is the method that I need to be using. I whipped up a quick example in Android (code follows), but the buffer method doesn't seem to be as simple as just collect all event inputs and spit out a collection.
public class DemoFragment
extends Fragment {
private int _tapCount = 0;
private Observable<List<Integer>> _bufferedObservable;
private Observer<List<Integer>> _observer;
@Override
public void onActivityCreated(@Nullable Bundle savedInstanceState) {
super.onActivityCreated(savedInstanceState);
_setupLogger();
_bufferedObservable = _getBufferedObservable();
_observer = _getObserver();
}
// the library butterknife allows this
@OnClick(R.id.btn_start_operation)
public void onButtonTapped() {
_log("GOT A TAP");
_bufferedObservable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(_observer);
}
private Observable<List<Integer>> _getBufferedObservable() {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1); // send one tap here
}
}).buffer(2, TimeUnit.SECONDS); // collect all taps in the last 2s
}
private Observer<List<Integer>> _getObserver() {
return new Observer<List<Integer>>() {
@Override
public void onCompleted() {
_log(String.format("%d taps", _tapCount));
_tapCount = 0; // reset tap count
}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(List<Integer> integers) {
if (integers.size() > 0) {
for (int i : integers) {
_tapCount += i;
}
onCompleted();
} else {
_log("No taps received");
}
}
};
}
// ... other method that help wiring up the example (irrelevant to RxJava)
}
Can anyone help me understand the misconception in my understanding here?
Problem 1: I'm expecting _getObserver()
's onNext
to send me a list with the accumulated number of taps. So if the button was hit 5 times, then i'm expecting a List with 5 items, each of which have the value "1". With the existing code, i'm always getting an empty list.
Problem 2: I basically do a console log if no events were received by checking List<Integer> integers
size. If list is not empty, i throw in a console log, saying "not taps received". It appears that the Observable NEVER stops. It's almost like a timer, where it constantly keeps going on and on, even when no button tap was registered. Is there way to stop the Observable, if no event was registered in the last 10s?
Problem 3: The number of emits seem to almost exponentially increase. It's almost like its gathering the button empty taps from all previous times.
Here is a code showing how I would do it (assuming your button's id is R.id.rx_button
):
private Subscription mSubscription;
@Override
protected void onResume() {
super.onResume();
mSubscription = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
findViewById(R.id.rx_button).setOnClickListener(new OnClickListener() {
@Override
public void onClick(View v) {
subscriber.onNext(1);
}
});
}
}).buffer(2, TimeUnit.SECONDS)
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
Log.i("TAG", String.valueOf(integers.size()));
}
});
}
@Override
protected void onPause() {
super.onPause();
mSubscription.unsubscribe();
}
Simply put, just close the OnClickListener
implementation over the call
method, so you will be able to use the subscriber
object inside it.
The onResume
will look even better with lambdas (take a look on the Retrolambda project):
@Override
protected void onResume() {
super.onResume();
mSubscription = Observable.create((Subscriber<? super Integer> subscriber) ->
findViewById(R.id.rx_button).setOnClickListener(view ->
subscriber.onNext(1))).buffer(2, TimeUnit.SECONDS)
.subscribe(integers -> Log.i("TAG", String.valueOf(integers.size())));
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With