I have the following code, which creates a custom Observable
using the Observable.create(OnSubscribe)
method:
public class Main {
public static void main(String[] args) {
Subscription subscription = Observable
.create(subscriber -> {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
})
.subscribe(System.out::println);
new Scanner(System.in).nextLine();
System.err.println("finishing");
subscription.unsubscribe();
}
}
The Observable issues a string every second using a timer. When the user presses enter, the subscriptions is cancelled.
However, the timer is still executed. How can I cancel the timer? I guess there must be a hook somewhere, but I can't find it.
On .NET, the create
method would return an IDisposable
which I could be my implementation to stop the timer. I am not sure how to map it to RxJava, as its subscribe
method is void
.
A more declarative (and IMHO easier to read) solution would be to use the Observable.using
method:
Observable<String> obs = Observable.using(
// resource factory:
() -> new Timer(),
// observable factory:
timer -> Observable.create(subscriber -> {
TimerTask task = new TimerTask() {
public void run() {
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
}),
// dispose action:
timer -> timer.cancel()
);
You declare how the dependent resource (the Timer
) is created, how it's used to create an Observable, and how it's disposed of, and RxJava will take care of creating the timer on subscription and disposing of it on unsubscription.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With