I'm trying to get the latest value of a given Observable
and get it to emit immediately once it's called. Given the code below as an example:
return Observable.just(myObservable.last()) .flatMap(myObservable1 -> { return myObservable1; }) .map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object
This does not work because by doing this the flatMap
will emit myObservable1
which in turn will have to emit to reach the map
. I don't know if doing such a thing is even possible. Does anyone have any clue on how to achieve this goal? Thank you
Rather than a standard subject (Observable) that just emits values as they come in, a BehaviorSubject emits the last value upon subscribe() . You can also get the last value manually using the BehaviorSubjects getValue() method.
You cannot 'extract' something from an observable. You get items from observable when you subscribe to them (if they emit any). Since the object you are returning is of type Observable, you can apply operators to transform your data to your linking.
To get current value of RxJS Subject or Observable, we can use the first method. const observable = of("foo"); const hasValue = (value: any) => { return value !== null && value !== undefined; }; const getValue = <T>(observable: Observable<T>): Promise<T> => { return observable.
last()
method will not be of any help here as it waits for the Observable to terminate to give you the last item emitted.
Assuming that you do not have the control over the emitting observable you could simply create a BehaviorSubject
and subscribe it to the observable that emits the data that you want to listen and then subscribe to the created subject. Since Subject
is both Observable
and Subscriber
you will get what you want.
I think (do not have the time to check it now) you may have to manually unsubscribe from the original observable as the BehaviorSubject
once all of his subscribers unsubscribe will not unsubscribe automatically.
Something like this:
BehaviorSubject subject = new BehaviorSubject(); hotObservable.subscribe(subject); subject.subscribe(thing -> { // Here just after subscribing // you will receive the last emitted item, if there was any. // You can also always supply the first item to the behavior subject });
http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html
In RxJava, subscriber.onXXX is called asynchronous.It means that if your Observable emit items in new thread, you can never get the last item before return, except you block the thread and wait for the item.But if the Observable emit item synchronously and you dont' change it's thread by subscribeOn and observOn, such as the code:
Observable.just(1,2,3).subscribe();
In this case, you can get the last item by doing like this:
Integer getLast(Observable<Integer> o){ final int[] ret = new int[1]; Observable.last().subscribe(i -> ret[0] = i); return ret[0]; }
It's a bad idea doing like this.RxJava prefer you to do asynchronous work by it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With