Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava subscribe to blocking observable

Tags:

rx-java

I'd like to achieve the following:

String result = myObservable.toBlocking().first();

i.e. it is like a regular function call. However this never happens because you'd need to subscribe to it, which I don't know how to do. If I subscribe to it, the result will be in another scope, and the code is very ugly because I can only get the result like its a regular observable anyway, so there's no point turning it into a blocking observable.

like image 381
breakline Avatar asked Sep 16 '16 02:09

breakline


People also ask

Is Observable subscribe blocking?

Calling subscribe() on a single-threaded observable will block until the entire observable chain has been executed.

How do you block Observable?

A Blocking Observable extends the ordinary Observable class by providing a set of operators on the items emitted by the Observable that block. To transform an Observable into a BlockingObservable , use the Observable. toBlocking( ) method or the BlockingObservable. from( ) method.

How do I stop Observable RxJava?

using() and subscribe to the Observable with a Subscriber so you can call subscriber. unsubscribe() when the user clicks the button to cancel. Here's an outline. This will stop the observable stream and call socketDisposer() to stop the network activity.

What is toBlocking RxJava?

toBlocking() converts the Observable into a BlockingObservable , and calling BlockingObservable. single() returns whatever item is emitted by the BlockingObservable , as long as it only emits one item before completing (more or less than one results in an error being emitted).


1 Answers

It actually works as you want:

    Observable<String> myObservable = Observable.just("firstValue", "secondValue");
    String result = myObservable.toBlocking().first();
    System.out.println(result); // ---> "firstValue"

Under the hood, calling BlockingObservable.first() does the subscription for you:

private T blockForSingle(final Observable<? extends T> observable) {
    final AtomicReference<T> returnItem = new AtomicReference<T>();
    final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
    final CountDownLatch latch = new CountDownLatch(1);

    @SuppressWarnings("unchecked")
    Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() {
        @Override
        public void onCompleted() {
            latch.countDown();
        }

        @Override
        public void onError(final Throwable e) {
            returnException.set(e);
            latch.countDown();
        }

        @Override
        public void onNext(final T item) {
            returnItem.set(item);
        }
    });
    BlockingUtils.awaitForComplete(latch, subscription);

    if (returnException.get() != null) {
        Exceptions.propagate(returnException.get());
    }

    return returnItem.get();
}

UPDATE: If doesn't make any sense to use a BehaviourSubject plus toBlocking(). Have into account that it is both and Observable and an Observer so somewhere, myObservable.onNext("value") should be invoked. If you block the thread by calling toBlocking(), unless myObservable is available in some other thread where onNext() is called, you are gonna get blocked.

For instance, this is the normal use of a `BehaviourSubject:

  // observer will receive the "one", "two" and "three" events, but not "zero"
  BehaviorSubject<Object> subject = BehaviorSubject.create("default");
  subject.onNext("zero");
  subject.onNext("one");
  subject.subscribe(observer);
  subject.onNext("two");
  subject.onNext("three");
like image 120
codependent Avatar answered Oct 29 '22 03:10

codependent