Can I use Retrofit + RxJava to listen to an endless stream? For instance the Twitter stream. What I have is this:
public interface MeetupAPI {
@GET("http://stream.meetup.com/2/rsvps/")
Observable<RSVP> getRSVPs();
}
MeetupAPI api = new Retrofit.Builder()
.baseUrl(MeetupAPI.RSVP_API)
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build()
.create(MeetupAPI.class);
api.getRSVPs()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(rsvp -> Log.d(TAG, "got rsvp"),
error -> Log.d(TAG, "error: " + error),
() -> Log.d(TAG, "onComplete"));
but the "onComplete" is invoked after the first object has been parsed. Is there a way to tell Retrofit to stay open until further notice?
Here my solution:
You can use the @Streaming annotation:
public interface ITwitterAPI {
@GET("/2/rsvps")
@Streaming
Observable<ResponseBody> twitterStream();
}
ITwitterAPI api = new Retrofit.Builder()
.baseUrl("http://stream.meetup.com")
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build().create(ITwitterAPI.class);
With @Streaming
we can get raw input From ResponseBody
.
Here my function to wrap body divided by lines with events:
public static Observable<String> events(BufferedSource source) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
subscriber.onCompleted();
} catch (IOException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
});
}
And result usage:
api.twitterStream()
.flatMap(responseBody -> events(responseBody.source()))
.subscribe(System.out::println);
upd about gracefully stopping
When we unsubscribing, retrofit closes inputstream. But impossible to detect inputstream closed or not from inputstream themselves, so only way - try reading from stream - we gets exception with Socket closed
message.
We can interpret this exception as closing:
@Override
public void call(Subscriber<? super String> subscriber) {
boolean isCompleted = false;
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
} catch (IOException e) {
if (e.getMessage().equals("Socket closed")) {
isCompleted = true;
subscriber.onCompleted();
} else {
throw new UncheckedIOException(e);
}
}
//if response end we get here
if (!isCompleted) {
subscriber.onCompleted();
}
}
And if connection closed because response end, we haven't any exceptions. Here isCompleted
check for that. Let me know if i am wrong :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With