I've got a special producer consumer problem in RxJS: The producer slowly produces elements. A consumer is requesting elements and often has to wait for the producer. This can be achieved by zipping the producer and the request stream:
var produce = getProduceStream();
var request = getRequestStream();
var consume = Rx.Observable.zipArray(produce, request).pluck(0);
Sometimes a request gets aborted. A produced element should only consumed after a not aborted request:
produce: -------------p1-------------------------p2--------->
request: --r1--------------r2---------------r3-------------->
abort: ------a(r1)------------------a(?)------------------>
consume: ------------------c(p1, r2)-------------c(p2, r3)-->
The first request r1
would consume the first produced element p1
, but r1
gets aborted by a(r1)
before it can consume p1
. p1
is produced and gets consumed c(p1, r2)
on second request r2
. The second abort a(?)
is ignored, because no unanswered request happened before. The third request r3
has to wait on the next produced element p2
and is not aborted till p2
is produced. Thus, p2
is consumed c(p2, r3)
immediately after it got produced.
How can I achieve this in RxJS?
Edit:
I created an example with a QUnit test on jsbin. You can edit the function createConsume(produce, request, abort)
to try/test your solution.
The example contains the function definition of the previously accepted answer.
This (core idea minus details) passes your JSBin test:
var consume = request
.zip(abort.merge(produce), (r,x) => [r,x])
.filter(([r,x]) => isNotAbort(x))
.map(([r,p]) => p);
And the JSBin code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With