For some reason, i sometimes want to use RxOperators instead of the normal java-way of transforming data structures, because its cleaner and cleaner. For example:
Observable.from(listOfStrings)
.filter(string -> string.getNumber() >5)
.toList()
Is there any way of waiting for the result of the observable and return in in a function: This would do (but it doesn't work):
private String getFilteredString(String string){
Observable.from(listOfStrings)
.filter(string -> string.getNumber() >5)
.toList()
.subscribe(finalStirng -> {
return finalString;
})
}
You can transform any Observable
to a synchronous one using .toBlocking()
:
private List<String> getFilteredString(List<String> strings) {
return Observable.from(strings)
.filter(string -> string.length() > 5)
.toList()
.toBlocking()
.single();
}
UPDATE:
Although the code above is perfectly valid and will work as expected, this isn't how RxJava
is supposed to be used. If your only goal is to transform a collection, there are better ways to do it:
Stream
API
commons-collections
libraryThere is an instance method on rx.Observable
called x()
(link) that can be used to covert an observable to another value directly and fluently. The intent of this operator is to build up a repository of conversion functions that can switch an Observable into other kinds of concurrent/asynchronous data structures (such as other kinds of observables) or just simply unwrap the contents of the observable into the contained values. Write the conversion function once and store it away for repeated use.
Method signature:
public <R> R x(Func1<? super OnSubscribe<T>, ? extends R> conversion);
Usage for converting a potentially asynchronous observable to a concurrent list:
List<Integer> list = Observable.range(1, 10).x(new Func1<OnSubscribe<Integer>, List<Integer>>() {
@Override
public List<Integer> call(OnSubscribe<Integer> onSubscribe) {
List<Integer> allElements = new ArrayList<Integer>();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> failure = new AtomicReference<Throwable>();
onSubscribe.call(new Subscriber<Integer>(){
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
failure.set(e);
}
@Override
public void onNext(Integer t) {
allElements.add(t);
}});
while (true) {
try {
latch.await();
break;
} catch (InterruptedException e1) {
// continue waiting
}
}
Throwable e = failure.get();
if (e != null)
throw new RuntimeException(e);
return allElements;
}});
System.out.println(list);
output
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Please note that this operator is currently @Experimental
and is very likely to be renamed (to "extend"), however experimental functionality in RxJava can be changed in any release.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With