Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert a stream of variable length packets into fixed length packets in RxJS?

I'm new to RxJS.

Given the following stream

[ 'foo ', 'bar', ' b', 'az 12', '3', '4567' ]

I want to transform it into a fixed size packets (say of 3 bytes) + the remainder

['foo', ' ba', 'r b', 'az ', '123', '456', '7']

In real life it's actually a Buffer of binary data.

I'm wondering what's the idiomatic RxJS way to do it.

The trivial way I found is:

from([ 'foo ', 'bar', ' b', 'az 12', '3', '4567' ])
.pipe(
    Rx.concatMap(v => from(v)),
    Rx.bufferCount(3),
    Rx.map(v => v.join(''))
)
.subscribe(v => console.log(v))

It seems wasteful to split everything into a single chars, so another way I found was to use .slice() which might be better, but a lot more verbose.

const bufferToSize = (chunkSize) => (source) =>
    Observable.create(subscriber => {
        let buffer = new Buffer('')

        return source.subscribe({
            next: (value) => {
                buffer += value

                while (buffer.length > chunkSize) {
                    subscriber.next(buffer.slice(0, chunkSize))
                    buffer = buffer.slice(chunkSize, buffer.length)
                }
            },
            complete: () => {
                subscriber.next(buffer)
                subscriber.complete()
            }
        })
    });

from([ 'foo ', 'bar', ' b', 'az 12', '3', '4567' ])
    .pipe(bufferToSize(3))
    .subscribe(v => console.log(v))

both return expected results

foo
 ba
r b
az
123
456
7

Is there a better way to do that? or at least more idiomatic way?

Thanks

like image 735
miedwar Avatar asked Aug 20 '18 14:08

miedwar


1 Answers

Your first Option is perfect(except for from(), just use v => v).

@Mark said it will wait for the observable complete to take whole value, but it`s not. It just waits for 3 chars gathered and then emit the buffer.

I created delayed version to show you this is continuous stream.

https://stackblitz.com/edit/buffer-mxsltx?file=index.ts&devtoolsheight=50

like image 125
kuroneko0441 Avatar answered Oct 05 '22 11:10

kuroneko0441