Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling a stream of events based on timing in rxjs

I have a process that sends me packets of data at intervals, and I need to manage that stream based on the timing of when the packets arrive and so on. At some point I also close the stream and the process.

Right now, I'm using a set of timers to do this, but I hope I can do it with rxjs since it seems a very good fit for this kind of thing. So far, I haven't had much success.

The problem

The stream is supposed to send me packets at regular intervals, but it usually deviates a lot and sometimes gets stuck.

I want to close the stream at some point, under the following conditions:

  1. If it takes more than startDelay to send me the first packet.
  2. After the first packet is sent, if there is a pause of more than middleDelay between two packets.
  3. After a constant time period maxChannelTime.

When I'm about to close the stream due to any of the above reasons, I first request it to close politely so it can do some cleanup. Sometimes it also sends me a final data packet during the cleanup. But I want to wait no longer than cleanupTime for the cleanup and last data to arrive before I close the stream and ignore any more messages.

Elaboration

I'll create the "streams" by wrapping an event with an Observable. I have no trouble doing that.

By "closing" a stream, I mean telling the process to stop emitting data, and possibly to close (i.e. die).

like image 900
GregRos Avatar asked Nov 05 '17 10:11

GregRos


1 Answers

Tricky problem.

I've broken it down to two phases - 'regulated' (since we want to check for regular intervals), and 'cleanup'.

Working backwards, the output is

const regulated = source.takeUntil(close)
const cleanup = source.skipUntil(close).takeUntil(cleanupCloser)
const output = regulated.merge(cleanup)

'Closers' are observables that emit when it's time to close (one closer per timeout value).

const startTimeout = 600
const intervalTimeout = 200
const maxtimeTimeout = 3000
const cleanupTimeout = 300

const startCloser = Observable.timer(startTimeout)  // emit once after initial delay
  .takeUntil(source)                                // cancel after source emits
  .mapTo('startTimeoutMarker')

const intervalCloser = source.switchMap(x =>    // reset interval after each source emit
    Observable.timer(intervalTimeout)           // emit once after intervalTimeout
      .mapTo('intervalTimeoutMarker')
  )

const maxtimeCloser = Observable.timer(maxtimeTimeout)  // emit once after maxtime
  .takeUntil(startCloser)                               // cancel if startTimeout
  .takeUntil(intervalCloser)                            // cancel if intervalTimeout
  .mapTo('maxtimeTimeoutMarker')

const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1)

const cleanupCloser = close.switchMap(x =>      // start when close emits
     Observable.timer(cleanupTimeout)           // emit once after cleanup time
  ) 
  .mapTo('cleanupTimeoutMarker')

Here's a working sample CodePen (please run tests one at a time)

like image 160
Richard Matsen Avatar answered Sep 25 '22 19:09

Richard Matsen