Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combining Unknown number of Observables in RxJava

I have a situation where I have to create N Observable objects based on a request. I understand that I can use zip to combine a KNOWN number of Observables. But, I'm stuck trying to understand how I can combine an unknown number of Observable.

The part where I'm not clear is what function to pass to zip. Depending on the number of Observables, I have to create a lambda that takes N arguments.

All the Observable return different types of objects.

like image 297
Vanchinathan Chandrasekaran Avatar asked Dec 21 '16 23:12

Vanchinathan Chandrasekaran


2 Answers

Update:

I ended up with the normal way to solve parallel request list. Just use flatMap, merge, zip, any combining rx operator.

The only thing we need to do specially is to use .subscribeOn(Schedulers.io()) for each request. All other things, sending parallel requests or simultaneous would be arranged by rxjava perfectly.

If you want to see the effect, try following:

private void runMyTest() {
    List<Single<String>> singleObservableList = new ArrayList<>();
    singleObservableList.add(getSingleObservable(500, "AAA"));
    singleObservableList.add(getSingleObservable(300, "BBB"));
    singleObservableList.add(getSingleObservable(100, "CCC"));
    Single.merge(singleObservableList)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(System.out::println); }

private Single<String> getSingleObservable(long waitMilliSeconds, String name) {
    return Single
            .create((SingleOnSubscribe<String>) e -> {
                    try {
                        Thread.sleep(waitMilliSeconds);
                    } catch (InterruptedException exception) {
                        exception.printStackTrace();
                    }
                    System.out.println("name = " +name+ ", waitMilliSeconds = " +waitMilliSeconds+ ", thread name = "
+Thread.currentThread().getName()+ ", id =" +Thread.currentThread().getId());
                    if(!e.isDisposed()) e.onSuccess(name);
                })
            .subscribeOn(Schedulers.io()); }

output:

System.out: name = CCC, waitMilliSeconds = 100, thread name = RxCachedThreadScheduler-4, id =463

System.out: CCC

System.out: name = BBB, waitMilliSeconds = 300, thread name = RxCachedThreadScheduler-3, id =462

System.out: BBB

System.out: name = AAA, waitMilliSeconds = 500, thread name = RxCachedThreadScheduler-2, id =461

System.out: AAA

// ******previous answer but not accurate*******//

Use this one; solved my problem:

zip(java.lang.Iterable<? extends Observable<?>> ws,FuncN<? extends R> zipFunction) method.

An sample:

public Observable<CombinedData> getCombinedObservables() {
        List<Observable> observableList = new ArrayList<>();

        observableList.add(observable1);
        observableList.add(observable2);
        observableList.add(observable3);
        observableList.add(observable4);

        return Observable.zip(observableList, new Function<Object[], CombinedData>() {
            @Override
            public CombinedData apply(Object[] objects) throws Exception {
                return new CombinedData(...);
            }
        });
    }
like image 199
Bin Fan Avatar answered Oct 19 '22 06:10

Bin Fan


You can make your Observables return objects of common type, merge it and process as List:

class Result1 implements Result
class Result2 implements Result
class REsult3 implements Result

for(Observable o : yourObservableList)
    resultObservable.mergeWith(o) //use concat() if you need serial execution

resultObservable
    .toList()
    .doOnNext(results -> {
        //process your results (List<Result>)
    }
    .subscribe(...)
like image 34
Maksim Ostrovidov Avatar answered Oct 19 '22 06:10

Maksim Ostrovidov