I have a process that sends me packets of data at intervals, and I need to manage that stream based on the timing of when the packets arrive and so on. At some point I also close the stream and the process.
Right now, I'm using a set of timers to do this, but I hope I can do it with rxjs
since it seems a very good fit for this kind of thing. So far, I haven't had much success.
The stream is supposed to send me packets at regular intervals, but it usually deviates a lot and sometimes gets stuck.
I want to close the stream at some point, under the following conditions:
startDelay
to send me the first packet.middleDelay
between two packets.maxChannelTime
.When I'm about to close the stream due to any of the above reasons, I first request it to close politely so it can do some cleanup. Sometimes it also sends me a final data packet during the cleanup. But I want to wait no longer than cleanupTime
for the cleanup and last data to arrive before I close the stream and ignore any more messages.
I'll create the "streams" by wrapping an event with an Observable. I have no trouble doing that.
By "closing" a stream, I mean telling the process to stop emitting data, and possibly to close (i.e. die).
Tricky problem.
I've broken it down to two phases - 'regulated' (since we want to check for regular intervals), and 'cleanup'.
Working backwards, the output is
const regulated = source.takeUntil(close)
const cleanup = source.skipUntil(close).takeUntil(cleanupCloser)
const output = regulated.merge(cleanup)
'Closers' are observables that emit when it's time to close (one closer per timeout value).
const startTimeout = 600
const intervalTimeout = 200
const maxtimeTimeout = 3000
const cleanupTimeout = 300
const startCloser = Observable.timer(startTimeout) // emit once after initial delay
.takeUntil(source) // cancel after source emits
.mapTo('startTimeoutMarker')
const intervalCloser = source.switchMap(x => // reset interval after each source emit
Observable.timer(intervalTimeout) // emit once after intervalTimeout
.mapTo('intervalTimeoutMarker')
)
const maxtimeCloser = Observable.timer(maxtimeTimeout) // emit once after maxtime
.takeUntil(startCloser) // cancel if startTimeout
.takeUntil(intervalCloser) // cancel if intervalTimeout
.mapTo('maxtimeTimeoutMarker')
const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1)
const cleanupCloser = close.switchMap(x => // start when close emits
Observable.timer(cleanupTimeout) // emit once after cleanup time
)
.mapTo('cleanupTimeoutMarker')
Here's a working sample CodePen (please run tests one at a time)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With