Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx: Buffer with variable back pressured buffer time

I have played a little with Rx, but still consider myself newish to the world. I have a problem, and I am wondering if I can solve it via Rx. My initial use case is in C#, but may later want the same in JS (though if any code snippets in answers, any language of pseudo code is fine)

Currently I have a client app which simply buffers data (transactions) it creates, and sends to the server every 5 seconds. This data can contain many individual transactions, e.g. it has stored many and now come online so we want to send perhaps thousands to the server. When sending many as just described, the 8 second delay and buffering is fine (the data is already delayed anyway). However, when this client is, for example connected, and we create a transaction in real-time (i.e. just 1 or 2 single transactions), I would like to be able to send these immediately, i.e. not wait the 8 seconds.

So it looks similar to the Rx buffer, crossed with maybe the debounce? I have attempted to draw a marble diagram to help explain using an advanced graphic package (not) paint.

enter image description here

So, to walk through this, the 1st red marble is received and forwarded straight away. Next the yellow marble is also forwarded straight away as it has been 1 second since the last red marble.

Now the light blue marble, and a bunch of others come in less than 1 second, so we now want to buffer these, as we don't want to spam the network with possibly thousands of requests - what we will do here is buffer for say 5 seconds, and then send however many we have buffered, every 5 seconds, until this "spurt" has finished. After this, we then want to return to sending any other "individual" requests as they come.

It doesn't have t be exactly like the above, basically we want

  • individual transactions (to user input with a connected client app), to be sent immediately, (or some very small delay)

  • Detect when we start getting many transactions, and have this start to "throttle" and then send the buffered batches at some longer time interval apart (eg 5 or 8 seconds)

  • Do NOT want to throw away any transactions (marbles), I want to send them all

I have found a number of other posts similar, but not the same as this.

I've come up with some clunky "manual" ways of doing this (using just standard lists, and various timers etc), but was wondering would this be possible using Rx to do some of this work, and hopefully less prone to bugs?

Thanks in advance for any help here!

like image 741
peterc Avatar asked Nov 17 '25 04:11

peterc


1 Answers

I think you picked it already, buffer and debounce as the buffer trigger,

when this client is, for example connected

If you want to added a connection event, you could merge that into bufferTrigger as well.

console.clear()

const source = new Rx.Subject();
const bufferTrigger = source.debounceTime(500);

source
  .buffer(bufferTrigger)
  .subscribe(console.log);

setTimeout(() => source.next('red'), 0);
setTimeout(() => source.next('yellow'), 1000);
setTimeout(() => source.next('lightblue'), 3000);
setTimeout(() => source.next('green'), 3100);
setTimeout(() => source.next('blue'), 3200);
setTimeout(() => source.next('pink'), 4000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

Example with additional trigger 5 secs after last emit

const source = new Rx.Subject();
const lastEmit = new Rx.Subject();
const maxDelayAfterLastEmit = lastEmit.delay(5000);
const bufferTrigger = source.debounceTime(500)
  .merge(maxDelayAfterLastEmit);

const emits = source
  .buffer(bufferTrigger)
  .do(x => lastEmit.next(x))
  .filter(x => x.length);

var start = new Date().getTime();
emits.subscribe(x => console.log((new Date().getTime() - start)/1000  + "s " + x));

setTimeout(() => source.next('red'), 0);
setTimeout(() => source.next('yellow'), 1000);
setTimeout(() => source.next('lightblue'), 3000);
setTimeout(() => source.next('green'), 3100);
setTimeout(() => source.next('blue1'), 3200);
setTimeout(() => source.next('blue2'), 3300);
setTimeout(() => source.next('blue3'), 3600);
setTimeout(() => source.next('pink1'), 4000);
setTimeout(() => source.next('pink2'), 4400);
setTimeout(() => source.next('pink3'), 4800);
setTimeout(() => source.next('pink4'), 5200);
setTimeout(() => source.next('pink5'), 5600);
setTimeout(() => source.next('pink6'), 6000);
setTimeout(() => source.next('pink7'), 6400);
setTimeout(() => source.next('pink8'), 6800);
setTimeout(() => source.next('pink9'), 7200);
setTimeout(() => source.next('pink10'), 7700);
setTimeout(() => source.next('pink11'), 8200);
setTimeout(() => source.next('pink12'), 8600);
setTimeout(() => source.next('pink13'), 9000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
like image 68
Richard Matsen Avatar answered Nov 18 '25 19:11

Richard Matsen



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!