Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How can I make this rxjava zip to run in parallel?

I have a sleep method for simulating a long running process.

private void sleep() {
    try {
    } catch (InterruptedException e) {

Then I have a method returns an Observable containing a list of 2 strings that is given in the parameters. It calls the sleep before return the strings back.

private Observable<List<String>> getStrings(final String str1, final String str2) {
    return Observable.fromCallable(new Callable<List<String>>() {
        public List<String> call() {
            List<String> strings = new ArrayList<>();
            return strings;

Then I am calling the getStrings three times in Observalb.zip, I expect those three calls to run in parallel, so the total time of execution should be within 2 seconds or maybe 3 seconds the most because the sleep was only 2 seconds. However, it's taking a total of six seconds. How can I make this to run in parallel so it will finish within 2 seconds?

.zip(getStrings("One", "Two"), getStrings("Three", "Four"), getStrings("Five", "Six"), mergeStringLists())
.subscribe(new Observer<List<String>>() {
    public void onCompleted() {


    public void onError(Throwable e) {


    public void onNext(List<String> strings) {
        //Display the strings

The mergeStringLists method

private Func3<List<String>, List<String>, List<String>, List<String>> mergeStringLists() {
    return new Func3<List<String>, List<String>, List<String>, List<String>>() {
        public List<String> call(List<String> strings, List<String> strings2, List<String> strings3) {
            Log.d(TAG, "...");

            for (String s : strings2) {

            for (String s : strings3) {

            return strings;
like image 807
s-hunter Avatar asked Jul 06 '16 21:07


2 Answers

That's happening because subscribing to your zipped observable happens in the the same, io thread.

Why don't you try this instead:

        getStrings("One", "Two")
        getStrings("Three", "Four")
        getStrings("Five", "Six")
    .subscribe(new Observer<List<String>>() {
        public void onCompleted() {


        public void onError(Throwable e) {


        public void onNext(List<String> strings) {
            //Display the strings

Let me know if that helped

like image 162
Bartek Lipinski Avatar answered Oct 18 '22 19:10

Bartek Lipinski

Here I have an example that I did using Zip in asynchronous way, just in case you´re curious

 * Since every observable into the zip is created to                 subscribeOn a diferent thread, it´s means all of them will run in parallel.
 * By default Rx is not async, only if you explicitly use subscribeOn.
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2).concat(s3))
            .subscribe(result -> showResult("Async in:", start, result));

public Observable<String> obAsyncString() {
    return Observable.just("")
            .doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName()))
            .map(val -> "Hello");

public Observable<String> obAsyncString1() {
    return Observable.just("")
            .doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName()))
            .map(val -> " World");

public Observable<String> obAsyncString2() {
    return Observable.just("")
            .doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName()))
            .map(val -> "!");

You can see more examples here https://github.com/politrons/reactive

like image 22
paul Avatar answered Oct 18 '22 19:10
