Logo Questions Linux Laravel Mysql Ubuntu Git Menu

RxJS: Producer-consumer with abort

I've got a special producer consumer problem in RxJS: The producer slowly produces elements. A consumer is requesting elements and often has to wait for the producer. This can be achieved by zipping the producer and the request stream:

var produce = getProduceStream();
var request = getRequestStream();

var consume = Rx.Observable.zipArray(produce, request).pluck(0);

Sometimes a request gets aborted. A produced element should only consumed after a not aborted request:

produce:  -------------p1-------------------------p2--------->
request:  --r1--------------r2---------------r3-------------->
abort:    ------a(r1)------------------a(?)------------------>
consume:  ------------------c(p1, r2)-------------c(p2, r3)-->

The first request r1 would consume the first produced element p1, but r1 gets aborted by a(r1) before it can consume p1. p1 is produced and gets consumed c(p1, r2) on second request r2. The second abort a(?) is ignored, because no unanswered request happened before. The third request r3 has to wait on the next produced element p2 and is not aborted till p2 is produced. Thus, p2 is consumed c(p2, r3) immediately after it got produced.

How can I achieve this in RxJS?

Edit: I created an example with a QUnit test on jsbin. You can edit the function createConsume(produce, request, abort) to try/test your solution.

The example contains the function definition of the previously accepted answer.

like image 540
maiermic Avatar asked Feb 26 '15 17:02


1 Answers

This (core idea minus details) passes your JSBin test:

var consume = request
  .zip(abort.merge(produce), (r,x) => [r,x])
  .filter(([r,x]) => isNotAbort(x))
  .map(([r,p]) => p);

And the JSBin code.

like image 170
André Staltz Avatar answered Oct 05 '22 06:10

André Staltz