I'm new to RxJS.
Given the following stream
[ 'foo ', 'bar', ' b', 'az 12', '3', '4567' ]
I want to transform it into a fixed size packets (say of 3 bytes) + the remainder
['foo', ' ba', 'r b', 'az ', '123', '456', '7']
In real life it's actually a Buffer of binary data.
I'm wondering what's the idiomatic RxJS way to do it.
The trivial way I found is:
from([ 'foo ', 'bar', ' b', 'az 12', '3', '4567' ])
.pipe(
Rx.concatMap(v => from(v)),
Rx.bufferCount(3),
Rx.map(v => v.join(''))
)
.subscribe(v => console.log(v))
It seems wasteful to split everything into a single chars, so another way I found was to use .slice() which might be better, but a lot more verbose.
const bufferToSize = (chunkSize) => (source) =>
Observable.create(subscriber => {
let buffer = new Buffer('')
return source.subscribe({
next: (value) => {
buffer += value
while (buffer.length > chunkSize) {
subscriber.next(buffer.slice(0, chunkSize))
buffer = buffer.slice(chunkSize, buffer.length)
}
},
complete: () => {
subscriber.next(buffer)
subscriber.complete()
}
})
});
from([ 'foo ', 'bar', ' b', 'az 12', '3', '4567' ])
.pipe(bufferToSize(3))
.subscribe(v => console.log(v))
both return expected results
foo
ba
r b
az
123
456
7
Is there a better way to do that? or at least more idiomatic way?
Thanks
Your first Option is perfect(except for from()
, just use v => v
).
@Mark said it will wait for the observable complete to take whole value, but it`s not. It just waits for 3 chars gathered and then emit the buffer.
I created delayed version to show you this is continuous stream.
https://stackblitz.com/edit/buffer-mxsltx?file=index.ts&devtoolsheight=50
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With