I'd like to achieve the following:
String result = myObservable.toBlocking().first();
i.e. it is like a regular function call. However this never happens because you'd need to subscribe to it, which I don't know how to do. If I subscribe to it, the result will be in another scope, and the code is very ugly because I can only get the result like its a regular observable anyway, so there's no point turning it into a blocking observable.
Calling subscribe() on a single-threaded observable will block until the entire observable chain has been executed.
A Blocking Observable extends the ordinary Observable class by providing a set of operators on the items emitted by the Observable that block. To transform an Observable into a BlockingObservable , use the Observable. toBlocking( ) method or the BlockingObservable. from( ) method.
using() and subscribe to the Observable with a Subscriber so you can call subscriber. unsubscribe() when the user clicks the button to cancel. Here's an outline. This will stop the observable stream and call socketDisposer() to stop the network activity.
toBlocking() converts the Observable into a BlockingObservable , and calling BlockingObservable. single() returns whatever item is emitted by the BlockingObservable , as long as it only emits one item before completing (more or less than one results in an error being emitted).
It actually works as you want:
Observable<String> myObservable = Observable.just("firstValue", "secondValue");
String result = myObservable.toBlocking().first();
System.out.println(result); // ---> "firstValue"
Under the hood, calling BlockingObservable.first()
does the subscription for you:
private T blockForSingle(final Observable<? extends T> observable) {
final AtomicReference<T> returnItem = new AtomicReference<T>();
final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
@SuppressWarnings("unchecked")
Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(final Throwable e) {
returnException.set(e);
latch.countDown();
}
@Override
public void onNext(final T item) {
returnItem.set(item);
}
});
BlockingUtils.awaitForComplete(latch, subscription);
if (returnException.get() != null) {
Exceptions.propagate(returnException.get());
}
return returnItem.get();
}
UPDATE: If doesn't make any sense to use a BehaviourSubject
plus toBlocking()
. Have into account that it is both and Observable
and an Observer
so somewhere, myObservable.onNext("value")
should be invoked. If you block the thread by calling toBlocking()
, unless myObservable
is available in some other thread where onNext()
is called, you are gonna get blocked.
For instance, this is the normal use of a `BehaviourSubject:
// observer will receive the "one", "two" and "three" events, but not "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create("default");
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With