Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjava and terminating streams

I'm new to reactive programming using rxjava and after going through the simpler examples I'm now trying to figure out how to work with continuous streams. The problem I have with the example below is that the program doesn't terminate after I've taken the 3 elements. My assumption is that I somehow need to unsubscribe to my observable but I don't fully grasp how to terminate the while loop and make the program exit.

I've come across the following post RxJava -- Terminating Infinite Streams but I still can't figure out what I'm missing.

class MyTwitterDataProvider {
/*
This example is written in Groovy

Instance variables and constructor omitted
*/

public Observable<String> getTweets() {
    BufferedReader reader = new BufferedReader(new InputStreamReader(getTwitterStream()))

    Observable.create({ observer ->
        executor.execute(new Runnable() {
            def void run() {
                String newLine
                while ((newLine = reader.readLine()) != null) {
                    System.out.println("printing tweet: $newLine")
                    observer.onNext(newLine)
                }

                observer.onCompleted()
            }
        })
    })
}

def InputStream getTwitterStream() {
// code omitted
}

public static void main (String [] args) {
    MyTwitterDataProvider provider = new MyTwitterDataProvider()
    Observable<String> myTweetsObservable = provider.getTweets().take(3)

    Subscription myTweetSubscription = myTweetsObservable.subscribe({tweet-> println("client prints: $tweet")})
   // myTweetSubscription.unsubscribe()
}
}
like image 609
MrKY Avatar asked Apr 21 '26 04:04

MrKY


1 Answers

You must add a check in your loop to see if the observer is still subscribed:

            while ((newLine = reader.readLine()) != null && !observer.isUnsubsribed()) {
                System.out.println("printing tweet: $newLine")
                observer.onNext(newLine)
            }
like image 160
alexwen Avatar answered Apr 23 '26 01:04

alexwen



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!