The scenario I am working on is composed by 3 Observables.
StartObs: this Observable emits when I need to start a sequence of processings - the data emitted is a processID of the process I need to fulfill
DoStuffObs: this Observable emits commands upon which I have to do something - I want to start listening to such Observable just after StartObs has emitted and I need the processID of the process to perform my duties with the function doTheWork(command, processId)
EndObs: this Observable emits when I have to end the processing of a certain processID and have to go back to listen to the next emission of StartObs
So basically is: Start, DoStuff until End and then go back to listening to the next Start.
It is also guaranteed that after one Start comes sooner or later one End and that it is not possible to have 2 Start without an End in between or 2 End without a Start in between.
The first 2 steps can be achieved via switchMap
, like
StartObs
.switchMap(processId => DoStuff.map(command => ({command, processId})))
.tap(data => doTheWork(data.command, data.processId))
What is not clear to me is how to deal with EndObs.
The option of using takeUntil
does not work, since I do not want to complete the chain started with StartObs since I have to go back to listening for the next process to start.
Actually, I think takeUntil()
is the best choice here in combination with repeat()
.
StartObs
.switchMap(processId => DoStuff.map(command => ({command, processId})))
.tap(data => doTheWork(data.command, data.processId))
.takeUntil(EndObs)
.repeat();
When the chain completes with takeUntil()
it will immediately resubscribe thanks to repeat()
and the whole process will start all over again.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With