I have an RxJS Observable which emits binary data of type Uint8Array values. But not every emitted value contains exactly one complete data oject which can be processed on its own.
The data format of complete data objects consist of a start Byte (0xAA), some variable length data in between and an end byte (0xFF). The data in between is BCD encoded, this means principally it does not contain start or end bytes but only binary values from 0x00 to 0x99.
Here is an example:
// This is a mock of the source observable which emits values:
const source$ = from([
// Case 1: One complete data object with start (0xAA) and end byte (0xFF)
new Uint8Array([0xAA, 0x01, 0x05, 0x95, 0x51, 0xFF,]),
// Case 2: Two complete data objects in a single value emit
new Uint8Array([0xAA, 0x12, 0x76, 0xFF, 0xAA, 0x83, 0x43, 0xFF,]),
// Case 3: Two uncomplete value emits which form a single data object
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67]),
new Uint8Array([0x82, 0x73, 0x44, 0x28, 0x85, 0xFF]),
// Case 4: A combination of Cases 2 and 3
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67]),
new Uint8Array([0x55, 0x81, 0xFF, 0xAA, 0x73, 0x96]),
new Uint8Array([0x72, 0x23, 0x11, 0x95, 0xFF]),
])
source$.subscribe((x) => {
console.log('Emitted value as Hexdump:')
console.log(hexdump(x.buffer))
})

The goal is to receive only complete data objects. Maybe as a tranformed new observable?
The example from above should be something like this:
const transformedSource$ = from([
// Case 1
new Uint8Array([0xAA, 0x01, 0x05, 0x95, 0x51, 0xFF,]),
// Case 2
new Uint8Array([0xAA, 0x12, 0x76, 0xFF,]),
new Uint8Array([0xAA, 0x83, 0x43, 0xFF,]),
// Case 3
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67, 0x82, 0x73, 0x44, 0x28, 0x85, 0xFF]),
// Case 4
new Uint8Array([0xAA, 0x61, 0x85, 0x43, 0x67, 0x55, 0x81, 0xFF]),
new Uint8Array([0xAA, 0x73, 0x96, 0x72, 0x23, 0x11, 0x95, 0xFF]),
])
0xFF and then to do merges again. How to do this? Ideas from people with RxJS experience is much appreciated.What you could try is splitting the chunks in order to process the bytes one by one, adding them to a buffer until the 0xff byte appears in the stream and returning all buffered elements and reset the buffer for the next chunk:
let buffer = new Uint8Array();
transformedSource$.pipe(
mergeAll(), // this splits your array and emits the single bytes into the stream
mergeMap((next) => {
buffer = new Uint8Array([...buffer, next]);
if (next === 0xff) { (
const result = buffer;
buffer = new Uint8Array(); // resets the buffer
return of(result); // will emit a completed chunk
}
return EMPTY; // won't emit anything
})
)
.subscribe(console.log);
and in case you are not a fan of global variables or you want to reuse that code also for other streams, here an alternative solution with a custom operator:
const mergeChunks = () => {
let buffer = new Uint8Array();
return (source$) =>
source$.pipe(
mergeMap((next) => {
buffer = new Uint8Array([...buffer, next]);
if (next === 0xff) {
const result = buffer;
buffer = new Uint8Array();
return of(result);
}
return EMPTY;
})
);
}
transformedSource$.pipe(
mergeAll(),
mergeChunks(),
)
.subscribe(console.log);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With