Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Android Retrofit 2 + RxJava: listen to endless stream

Can I use Retrofit + RxJava to listen to an endless stream? For instance the Twitter stream. What I have is this:

public interface MeetupAPI {
    Observable<RSVP> getRSVPs();

MeetupAPI api = new Retrofit.Builder()

        .subscribe(rsvp -> Log.d(TAG, "got rsvp"),
                error -> Log.d(TAG, "error: " + error),
                () -> Log.d(TAG, "onComplete"));

but the "onComplete" is invoked after the first object has been parsed. Is there a way to tell Retrofit to stay open until further notice?

like image 899
ticofab Avatar asked Apr 13 '16 15:04


1 Answers

Here my solution:

You can use the @Streaming annotation:

public interface ITwitterAPI {

    Observable<ResponseBody> twitterStream();

ITwitterAPI api = new Retrofit.Builder()

With @Streaming we can get raw input From ResponseBody.

Here my function to wrap body divided by lines with events:

public static Observable<String> events(BufferedSource source) {
    return Observable.create(new Observable.OnSubscribe<String>() {
        public void call(Subscriber<? super String> subscriber) {
            try {
                while (!source.exhausted()) {
            } catch (IOException e) {

And result usage:

  .flatMap(responseBody -> events(responseBody.source()))

upd about gracefully stopping

When we unsubscribing, retrofit closes inputstream. But impossible to detect inputstream closed or not from inputstream themselves, so only way - try reading from stream - we gets exception with Socket closed message. We can interpret this exception as closing:

        public void call(Subscriber<? super String> subscriber) {
            boolean isCompleted = false;
            try {
                while (!source.exhausted()) {
            } catch (IOException e) {
                if (e.getMessage().equals("Socket closed")) {
                    isCompleted = true;
                } else {
                    throw new UncheckedIOException(e);
            //if response end we get here
            if (!isCompleted) {

And if connection closed because response end, we haven't any exceptions. Here isCompleted check for that. Let me know if i am wrong :)

like image 127
zella Avatar answered Oct 09 '22 22:10
