Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I apply timed back pressure in RxJS5?

Imagine I have the following code:

let a = Rx.Observable.of(1, 2, 3)
let b = Observable.zip(a, a, (a, b) => a + b)
b.forEach(t => console.log(t))

This immediately outputs the results. Now, how do I put a timed delay between each message as a way of back-pressure (note that I don't want a buffer; instead, I want a and b to become Cold Observables), like:

b.takeEvery(1000).forEach(t => console.log(t))

And have the exact same answer:

<wait 1s>
2
<wait 1s>
4
<wait 1s>
6

Alternative: If backpressure (ou pull mechanisms for some observables) is something not supported in RxJS, then how could one create an infinite generator without running out of resources?

Alternative 2: Other JS frameworks that support both pull and push mechanisms?

like image 411
Hugo Sereno Ferreira Avatar asked Jan 29 '18 03:01

Hugo Sereno Ferreira


1 Answers

In case of RxJS 5.x back pressure is not support, but there is for example pausable operator in 4.x version. It works only with hot observables. More info on back pressure in case of 4.x and here (especially take a loot at the bottom and RxJS related description).

This Erik Meijer's tweet may be bit controversial but relevant: https://twitter.com/headinthebox/status/774635475071934464

For your own implementation of back pressure mechanism you need to have 2-way communication channel, which can be fairly easily created with 2 subjects - one for each end. Basically use next for sending messages and .subscribe for listing to the other end.

Creating a generator is doable as well - again using a subject to bridge between push- and pull-based worlds. Below an exemplary implementation for generating Fibonacci numbers.

const fib = () => {
  const n = new Rx.Subject()
  const f = n
    .scan(c => ({ a: c.b, b: c.b + c.a }), { a: 0, b: 1 })
    .map(c => c.a)
    
  return {
    $: f,
    next: () => n.next()
  }
}

const f = fib()

f.$.subscribe(n => document.querySelector('#r').innerHTML = n)
Rx.Observable.fromEvent(document.querySelector('#f'), 'click')
  .do(f.next)
  .subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

<button id='f'>NEXT FIBONACCI</button>

<div id='r'>_?_<div>

Another js library which may be of interest for you is https://github.com/ubolonton/js-csp - did not use it, so not sure how it deals with back pressure.

like image 80
artur grzesiak Avatar answered Nov 20 '22 21:11

artur grzesiak