Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create one observable from several tasks

I want to create a method that accepts a S and returns an Observable<T> that is completed with one value after three (async) tasks are done.

The three tasks all run on Queues with their own consumers in separate threads.

Here's what I was thinking of:

  • Task 1 receives <S, Subject<T>> via the queue and computes the value from S and sets in in the Subject<T> with onNext and then calls onComplete().
  • Task 2 and 3 receive an <S, Subject<Void>> and call onComplete() once they've done their job.

Task 1, 2 and 3 can run independently, and complete in random order.

Now I have three Subjects, One of T, and two of Void. I want to return the first one, but only let it emit the value T once all tasks are done. This is because I don't want any subscribers of the observable do anything with T until all tasks are completed.

What is the correct way to combine the Subjects to achieve this behavior? I can easily hack this together using CountDownLatch etc but was hoping there's a rx-native kind of way to tackle this.

And is my plan to use Subjects as callbacks via the queue the right approach? I used to use CompletableFuture<T> for this, but I want to migrate to RX.

like image 925
Alex Avatar asked May 18 '26 19:05

Alex


2 Answers

I'm not entirely sure where the subject is coming into play but you can synchronize the three Tasks by using When + And + Then

public IObservable<T> MyMethod<S, T>(S incoming) {

  //Create a new plan
  return Observable.When(

   //Start with Task one which will return an T from an S
   Observable.FromAsync(async () => await SomeTaskToTurnSIntoT(incoming))

   //Add in Task two which returns a System.Reactive.Unit
   .And(Observable.FromAsync(() => /*Do Task 2*/))

   //Same for Task 3
   .And(Observable.FromAsync(() => /*Do Task 3*/))

   //Only emit the item from the first Task.
   .Then((ret, _, __) => ret))

   //Finally we only want this to process once, then we will reuse the
   //existing value for subsequent subscribers
   .PublishLast().RefCount();
}

The above will wait until all three items are completed before it emits. One thing to note is that in Rx the Void object is really a System.Reactive.Unit, so you should be returning that if there is no value.

like image 113
paulpdaniels Avatar answered May 23 '26 16:05

paulpdaniels


Don't use Subjects for this. Rather merge the observables using asynchronous schedulers. So you have:

T task1(S s);
void task2(S s);
void task3(S s);

Then

<S,T> Observable<T> get(S s) {
    return Observable.merge(
        Observable.just(s)
            .map(x -> task1(x))
            .subscribeOn(Schedulers.computation()),
        (Observable<T>) Observable.just(s)
            .doOnNext(x -> task2(x))
            .ignoreElements()
            .cast(Object.class)
            .subscribeOn(Schedulers.computation()),
        (Observable<T>) Observable.just(s)
            .doOnNext(x -> task3(x))
            .ignoreElements()
            .cast(Object.class)
            .subscribeOn(Schedulers.computation()))
        // wait for completion before emitting the single value
        .last();
}   
like image 20
Dave Moten Avatar answered May 23 '26 16:05

Dave Moten



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!