Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava Return Subscribed value in a function

For some reason, i sometimes want to use RxOperators instead of the normal java-way of transforming data structures, because its cleaner and cleaner. For example:

Observable.from(listOfStrings)
.filter(string -> string.getNumber() >5)
.toList()

Is there any way of waiting for the result of the observable and return in in a function: This would do (but it doesn't work):

private String getFilteredString(String string){
   Observable.from(listOfStrings)
       .filter(string -> string.getNumber() >5)
       .toList()
       .subscribe(finalStirng -> {
       return finalString;
       })
}
like image 244
johnny_crq Avatar asked Sep 29 '15 16:09

johnny_crq


2 Answers

You can transform any Observable to a synchronous one using .toBlocking():

private List<String> getFilteredString(List<String> strings) {
  return Observable.from(strings)
      .filter(string -> string.length() > 5)
      .toList()
      .toBlocking()
      .single();
}

UPDATE: Although the code above is perfectly valid and will work as expected, this isn't how RxJava is supposed to be used. If your only goal is to transform a collection, there are better ways to do it:

  • Java 8 has a Stream API
  • Guava has its own collections API
  • Apache has a commons-collections library
like image 140
Vladimir Mironov Avatar answered Nov 05 '22 23:11

Vladimir Mironov


There is an instance method on rx.Observable called x() (link) that can be used to covert an observable to another value directly and fluently. The intent of this operator is to build up a repository of conversion functions that can switch an Observable into other kinds of concurrent/asynchronous data structures (such as other kinds of observables) or just simply unwrap the contents of the observable into the contained values. Write the conversion function once and store it away for repeated use.

Method signature:

public <R> R x(Func1<? super OnSubscribe<T>, ? extends R> conversion);

Usage for converting a potentially asynchronous observable to a concurrent list:

List<Integer> list = Observable.range(1, 10).x(new Func1<OnSubscribe<Integer>, List<Integer>>() {
    @Override
    public List<Integer> call(OnSubscribe<Integer> onSubscribe) {
        List<Integer> allElements = new ArrayList<Integer>();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
        onSubscribe.call(new Subscriber<Integer>(){
            @Override
            public void onCompleted() {
                latch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                failure.set(e);
            }

            @Override
            public void onNext(Integer t) {
                allElements.add(t);
            }});
        while (true) {
            try {
                latch.await();
                break;
            } catch (InterruptedException e1) {
                // continue waiting
            }
        }
        Throwable e = failure.get();
        if (e != null)
            throw new RuntimeException(e);
        return allElements;

    }});
System.out.println(list);

output

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Please note that this operator is currently @Experimental and is very likely to be renamed (to "extend"), however experimental functionality in RxJava can be changed in any release.

like image 3
Aaron Avatar answered Nov 05 '22 23:11

Aaron