I have a sleep method for simulating a long running process.
private void sleep() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Then I have a method returns an Observable containing a list of 2 strings that is given in the parameters. It calls the sleep before return the strings back.
private Observable<List<String>> getStrings(final String str1, final String str2) {
return Observable.fromCallable(new Callable<List<String>>() {
@Override
public List<String> call() {
sleep();
List<String> strings = new ArrayList<>();
strings.add(str1);
strings.add(str2);
return strings;
}
});
}
Then I am calling the getStrings three times in Observalb.zip, I expect those three calls to run in parallel, so the total time of execution should be within 2 seconds or maybe 3 seconds the most because the sleep was only 2 seconds. However, it's taking a total of six seconds. How can I make this to run in parallel so it will finish within 2 seconds?
Observable
.zip(getStrings("One", "Two"), getStrings("Three", "Four"), getStrings("Five", "Six"), mergeStringLists())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<String>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(List<String> strings) {
//Display the strings
}
});
The mergeStringLists method
private Func3<List<String>, List<String>, List<String>, List<String>> mergeStringLists() {
return new Func3<List<String>, List<String>, List<String>, List<String>>() {
@Override
public List<String> call(List<String> strings, List<String> strings2, List<String> strings3) {
Log.d(TAG, "...");
for (String s : strings2) {
strings.add(s);
}
for (String s : strings3) {
strings.add(s);
}
return strings;
}
};
}
That's happening because subscribing to your zipped
observable happens in the the same, io
thread.
Why don't you try this instead:
Observable
.zip(
getStrings("One", "Two")
.subscribeOn(Schedulers.newThread()),
getStrings("Three", "Four")
.subscribeOn(Schedulers.newThread()),
getStrings("Five", "Six")
.subscribeOn(Schedulers.newThread()),
mergeStringLists())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<String>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(List<String> strings) {
//Display the strings
}
});
Let me know if that helped
Here I have an example that I did using Zip in asynchronous way, just in case you´re curious
/**
* Since every observable into the zip is created to subscribeOn a diferent thread, it´s means all of them will run in parallel.
* By default Rx is not async, only if you explicitly use subscribeOn.
*/
@Test
public void testAsyncZip() {
scheduler = Schedulers.newThread();
scheduler1 = Schedulers.newThread();
scheduler2 = Schedulers.newThread();
long start = System.currentTimeMillis();
Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2).concat(s3))
.subscribe(result -> showResult("Async in:", start, result));
}
public Observable<String> obAsyncString() {
return Observable.just("")
.observeOn(scheduler)
.doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName()))
.map(val -> "Hello");
}
public Observable<String> obAsyncString1() {
return Observable.just("")
.observeOn(scheduler1)
.doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName()))
.map(val -> " World");
}
public Observable<String> obAsyncString2() {
return Observable.just("")
.observeOn(scheduler2)
.doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName()))
.map(val -> "!");
}
You can see more examples here https://github.com/politrons/reactive
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With