Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Process a stream of events after a Start event and return to listen when an End event occurs

Tags:

rxjs

The scenario I am working on is composed by 3 Observables.

StartObs: this Observable emits when I need to start a sequence of processings - the data emitted is a processID of the process I need to fulfill

DoStuffObs: this Observable emits commands upon which I have to do something - I want to start listening to such Observable just after StartObs has emitted and I need the processID of the process to perform my duties with the function doTheWork(command, processId)

EndObs: this Observable emits when I have to end the processing of a certain processID and have to go back to listen to the next emission of StartObs

So basically is: Start, DoStuff until End and then go back to listening to the next Start.

It is also guaranteed that after one Start comes sooner or later one End and that it is not possible to have 2 Start without an End in between or 2 End without a Start in between.

The first 2 steps can be achieved via switchMap, like

StartObs
.switchMap(processId => DoStuff.map(command => ({command, processId})))
.tap(data => doTheWork(data.command, data.processId))

What is not clear to me is how to deal with EndObs.

The option of using takeUntil does not work, since I do not want to complete the chain started with StartObs since I have to go back to listening for the next process to start.

like image 774
Picci Avatar asked Sep 18 '25 13:09

Picci


1 Answers

Actually, I think takeUntil() is the best choice here in combination with repeat().

StartObs
  .switchMap(processId => DoStuff.map(command => ({command, processId})))
  .tap(data => doTheWork(data.command, data.processId))
  .takeUntil(EndObs)
  .repeat();

When the chain completes with takeUntil() it will immediately resubscribe thanks to repeat() and the whole process will start all over again.

like image 165
martin Avatar answered Sep 23 '25 13:09

martin