I'm looking for the best method to wait for async tasks to finish in rx-java.
As a common example say there is a function which gets a list of id's from a local store and then queries a remote system for those Id's, the remote system results are then consolidated into a single report and returned to the caller of the function. As the call to the remote system is slow we want them to be done in asynchronously and I only want to return once all of the calls have returned and their results have been processed.
The only reliable way I have found to do this is to poll the subscription to check it is unsubscribed yet. But I'm thinking doesn't seem to be the 'RX' way to do things!
As an example I've taken the example from http://howrobotswork.wordpress.com/2013/10/28/using-rxjava-in-android/ and amended it slightly to make it non-android and to show what I mean. I have to have the following code at the of the main() method to stop it exiting immediately.
while (!subscription.isUnsubscribed()) {
Thread.sleep(100);
}
The full code for the example is listed below (it is dependent on http://square.github.io/retrofit/ if your trying to compile it)
package org.examples;
import retrofit.RestAdapter;
import retrofit.http.GET;
import retrofit.http.Query;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
public class AsyncExample {
public static void main(String[] args) throws InterruptedException {
final Subscription subscription = Observable.from("London", "Paris", "Berlin")
.flatMap(new Func1<String, Observable<WeatherData>>() {
@Override
public Observable<WeatherData> call(String s) {
return ApiManager.getWeatherData(s);
}
})
.subscribe(
new Action1<WeatherData>() {
@Override
public void call(WeatherData weatherData) {
System.out.println(Thread.currentThread().getName() + " - " + weatherData.name + ", " + weatherData.base);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + " - ERROR: " + throwable.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println(Thread.currentThread().getName() + " - COMPLETED");
}
}
);
// Have to poll subscription to check if its finished - is this the only way to do it?
while (!subscription.isUnsubscribed()) {
Thread.sleep(100);
}
}
}
class ApiManager {
private interface ApiManagerService {
@GET("/weather")
WeatherData getWeather(@Query("q") String place, @Query("units") String units);
}
private static final RestAdapter restAdapter = new RestAdapter.Builder()
.setEndpoint("http://api.openweathermap.org/data/2.5")
.build();
private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class);
public static Observable<WeatherData> getWeatherData(final String city) {
return Observable.create(new Observable.OnSubscribe<WeatherData>() {
@Override
public void call(Subscriber<? super WeatherData> subscriber) {
try {
System.out.println(Thread.currentThread() + " - Getting " + city);
subscriber.onNext(apiManager.getWeather(city, "metric"));
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}
If you are asking how to turn an asynchronous Observable into a synchronous one you can use .toBlocking() and then use .last() to wait for the Observable to complete.. For example, the following will not continue until the timer completes even though the timer executes on the computation thread.
try {
Observable
.timer(10, TimeUnit.SECONDS)
.toBlocking()
.last(); // Wait for observable to complete. Last item discarded.
} catch (IllegalArgumentException ex) {
// No items or error.
}
You probably shouldn't use this method unless absolutely necessary. Normally, application termination would be controlled by some other event (key press, close menu item click, etc.) Your network request Observables would complete asynchronously and you would take action in the onNext(), onCompleted() and onError() calls to update the UI or display an error.
Also, the beauty of Retrofit is that it has built in support for RxJava and will execute network requests in the background. To use that support declare you interface as returning an Observable.
interface ApiManagerService {
@GET("/weather")
Observable<WeatherData> getWeather(@Query("q") String place, @Query("units") String units);
}
Which allows you to simplify your getWeatherData() method to this:
public static Observable<WeatherData> getWeatherData(final String city) {
return apiManager.getWeather(city, "metric");
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With