Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Understanding back-pressure in rxjs - only cache 5 images waiting for upload

Tags:

I am working on a node project that needs to submit thousands of images for processing. Before these images are uploaded to the processing server they need to be resized so I have something along the lines of this:

imageList
    .map(image => loadAndResizeImage)
    .merge(3)
    .map(image => uploadImage)
    .merge(3)
    .subscribe();

Image resizing typically takes a few tenths of a second, uploading and processing takes around 4 seconds.

How can I prevent thousands of resized images building up in memory as I wait for the upload queue to clear? I probably want to have 5 images resized and waiting to go so that as soon as an image upload finishes the next resized image is pulled from the queue and uploaded and a new image is resized and added to the 'buffer'.

An illustration of the issue can be found here:

https://jsbin.com/webaleduka/4/edit?js,console

Here there is a load step (taking 200ms) and a process step (taking 4 seconds). Each process is limited to a concurrency of 2. We can see that with 25 initial items we get to 20 images in memory.

I did look at the buffer options but neither seemed to do what I wanted to do.

At the moment I have just combined the load, resize and upload into one deferred observable that I merge with a max concurrency. I would like to have the images waiting for upload though and I am sure that it must be possible.

I am using RXjs 4 but I imagine the principals will be the same for 5.

Many Thanks.

like image 558
Roaders Avatar asked Nov 28 '16 11:11

Roaders


1 Answers

In RxJS 5 I'd do it like this:

Observable.range(1, 25)
    .bufferCount(5)
    .concatMap(batch => { // process images
        console.log('process', batch);
        return Observable.from(batch)
            .mergeMap(val => Observable.of('p' + val).delay(300))
            .toArray();
    })
    .concatMap(batch => { // send images
        console.log('send batch', batch);
        return Observable.from(batch)
            .mergeMap(val => Observable.of('s' + val).delay(500))
            .toArray();
    })
    .subscribe(val => {
        // console.log('response');
        console.log('response', val);

    });

With bufferCount operator I split the input array into batches of 5 items. Then each batch is first processed with the first concatMap() (I'm using concat on purpose because I want to wait until the nested Observable completes). Then the processed data is sent to another concatMap() that sends it to your server.

I'm using two delay() operators to simulate that different tasks take different time. In our case processing images is very quick so the first concatMap will emit items faster than the second concatMap is able to sent them to the server which is allright. The processed images will be stacked inside concatMap and will be send in batches one after another.

Output from this demo will look like this:

process [ 1, 2, 3, 4, 5 ]
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ]
process [ 6, 7, 8, 9, 10 ]
process [ 11, 12, 13, 14, 15 ]
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ]
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ]
process [ 16, 17, 18, 19, 20 ]
process [ 21, 22, 23, 24, 25 ]
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ]
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ]
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ]
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ]
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ]
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ]
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ]

See live demo: https://jsbin.com/mileqa/edit?js,console

However if you wanted to always first process a batch than send it and when it's sent than continue with another batch you'd have to move the second inner Observable from concatMap at the end of the toArray() in the first concatMap() call.

.concatMap(batch => { // process images
    console.log('process', batch);
    return Observable.from(batch)
        .mergeMap(val => Observable.of('p' + val).delay(100))
        .toArray()
        .concatMap(batch => { // send images
            console.log('send batch', batch);
            return Observable.from(batch)
                .mergeMap(val => Observable.of('s' + val).delay(500))
                .toArray();
        });
})

See live demo: https://jsbin.com/sabena/2/edit?js,console

This produces output as the following:

process [ 1, 2, 3, 4, 5 ]
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ]
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ]
process [ 6, 7, 8, 9, 10 ]
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ]
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ]
process [ 11, 12, 13, 14, 15 ]
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ]
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ]
process [ 16, 17, 18, 19, 20 ]
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ]
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ]
process [ 21, 22, 23, 24, 25 ]
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ]
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ]

You can see that "process", "send batch" and "response" logs are in order.

Implementation in RxJS 4 should be almost identical (just operators names might be slightly different).

In RxJS 4 there's also controlled() operator that doesn't exsit in RxJS 5 (yet?). I does maybe something very similar to what you need.

like image 94
martin Avatar answered Sep 23 '22 16:09

martin