Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to cleanup on Observable created with the .create(OnSubscribe) method

Tags:

java

rx-java

I have the following code, which creates a custom Observable using the Observable.create(OnSubscribe) method:

public class Main {

    public static void main(String[] args) {
        Subscription subscription = Observable
                .create(subscriber -> {
                    Timer timer = new Timer();
                    TimerTask task = new TimerTask() {

                        @Override
                        public void run() {
                            subscriber.onNext("tick! tack!");
                        }
                    };
                    timer.scheduleAtFixedRate(task, 0L, 1000L);
                })
                .subscribe(System.out::println);

        new Scanner(System.in).nextLine();
        System.err.println("finishing");

        subscription.unsubscribe();
    }
}

The Observable issues a string every second using a timer. When the user presses enter, the subscriptions is cancelled.

However, the timer is still executed. How can I cancel the timer? I guess there must be a hook somewhere, but I can't find it.

On .NET, the create method would return an IDisposable which I could be my implementation to stop the timer. I am not sure how to map it to RxJava, as its subscribe method is void.

like image 487
wujek Avatar asked Dec 08 '22 02:12

wujek


1 Answers

A more declarative (and IMHO easier to read) solution would be to use the Observable.using method:

Observable<String> obs = Observable.using(
    // resource factory:
    () -> new Timer(),
    // observable factory:
    timer -> Observable.create(subscriber -> {
        TimerTask task = new TimerTask() {
            public void run() {
                subscriber.onNext("tick! tack!");
            }
        };
        timer.scheduleAtFixedRate(task, 0L, 1000L);
    }),
    // dispose action:
    timer -> timer.cancel()
);

You declare how the dependent resource (the Timer) is created, how it's used to create an Observable, and how it's disposed of, and RxJava will take care of creating the timer on subscription and disposing of it on unsubscription.

like image 183
Samuel Gruetter Avatar answered Feb 23 '23 00:02

Samuel Gruetter