I want to create a method that accepts a S and returns an Observable<T> that is completed with one value after three (async) tasks are done.
The three tasks all run on Queues with their own consumers in separate threads.
Here's what I was thinking of:
<S, Subject<T>> via the queue and computes the value from S and sets in in the Subject<T> with onNext and then calls onComplete().<S, Subject<Void>> and call onComplete() once they've done their job. Task 1, 2 and 3 can run independently, and complete in random order.
Now I have three Subjects, One of T, and two of Void. I want to return the first one, but only let it emit the value T once all tasks are done. This is because I don't want any subscribers of the observable do anything with T until all tasks are completed.
What is the correct way to combine the Subjects to achieve this behavior? I can easily hack this together using CountDownLatch etc but was hoping there's a rx-native kind of way to tackle this.
And is my plan to use Subjects as callbacks via the queue the right approach? I used to use CompletableFuture<T> for this, but I want to migrate to RX.
I'm not entirely sure where the subject is coming into play but you can synchronize the three Tasks by using When + And + Then
public IObservable<T> MyMethod<S, T>(S incoming) {
//Create a new plan
return Observable.When(
//Start with Task one which will return an T from an S
Observable.FromAsync(async () => await SomeTaskToTurnSIntoT(incoming))
//Add in Task two which returns a System.Reactive.Unit
.And(Observable.FromAsync(() => /*Do Task 2*/))
//Same for Task 3
.And(Observable.FromAsync(() => /*Do Task 3*/))
//Only emit the item from the first Task.
.Then((ret, _, __) => ret))
//Finally we only want this to process once, then we will reuse the
//existing value for subsequent subscribers
.PublishLast().RefCount();
}
The above will wait until all three items are completed before it emits. One thing to note is that in Rx the Void object is really a System.Reactive.Unit, so you should be returning that if there is no value.
Don't use Subjects for this. Rather merge the observables using asynchronous schedulers. So you have:
T task1(S s);
void task2(S s);
void task3(S s);
Then
<S,T> Observable<T> get(S s) {
return Observable.merge(
Observable.just(s)
.map(x -> task1(x))
.subscribeOn(Schedulers.computation()),
(Observable<T>) Observable.just(s)
.doOnNext(x -> task2(x))
.ignoreElements()
.cast(Object.class)
.subscribeOn(Schedulers.computation()),
(Observable<T>) Observable.just(s)
.doOnNext(x -> task3(x))
.ignoreElements()
.cast(Object.class)
.subscribeOn(Schedulers.computation()))
// wait for completion before emitting the single value
.last();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With