I am trying to add variables into a template at specific indices through streams.
The idea is that I have a readable stream in and a list of variables that can be either a readable stream a buffer or a string of an undetermined size. These variables can be inserted at a predefined list of indices. I have a few questions based on my assumptions and what I have tried so far.
My first attempt was to do it manually with readable streams. However, I couldn't const buffer = templateIn.read(size)
(since the buffers were still empty) before template combined
was trying to read it. The solution for that problem is similar to how you'd use a transform stream so that was the next step I took.
However, I have a problem with the transform streams. My problem is that something like this pseudo code will pile up buffers into memory until done()
is called.
public _transform(chunk: Buffer, encoding: string, done: (err?: Error, data?: any) => void ): void {
let index = 0;
while (index < chunk.length) {
if (index === this.variableIndex) { // the basic idea (the actual logic is a bit more complex)
this.insertStreamHere(index);
index++;
} else {
// continue reading stream normally
}
}
done()
}
From: https://github.com/nodejs/node/blob/master/lib/_stream_transform.js
In a transform stream, the written data is placed in a buffer. When _read(n) is called, it transforms the queued up data, calling the buffered _write cb's as it consumes chunks. If consuming a single written chunk would result in multiple output chunks, then the first outputted bit calls the readcb, and subsequent chunks just go into the read buffer, and will cause it to emit 'readable' if necessary.
This way, back-pressure is actually determined by the reading side, since _read has to be called to start processing a new chunk. However, a pathological inflate type of transform can cause excessive buffering here. For example, imagine a stream where every byte of input is interpreted as an integer from 0-255, and then results in that many bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in 1kb of data being output. In this case, you could write a very small amount of input, and end up with a very large amount of output. In such a pathological inflating mechanism, there'd be no way to tell the system to stop doing the transform. A single 4MB write could cause the system to run out of memory.
So TL;DR: How do I insert (large) streams at a specific index, without having a huge back pressure of buffers in memory. Any advice is appreciated.
After a lot of reading the documentation and the source code, a lot of trial and error and some testing. I have come up with a solution for my problem. I can just copy and paste my solution, but for the sake of completeness I will explain my findings here.
Handling the back pressure with pipes consists out of a few parts. We've got the Readable
that writes data to the Writable
. The Readable
provides a callback for the Writable
with which it can tell the Readable
it is ready to receive a new chunk of data. The reading part is simpler. The Readable
has an internal buffer. Using Readable.push()
will add data to the buffer. When the data is being read, it will come from this internal buffer. Next to that we can use Readable.readableHighWaterMark
and Readable.readableLength
to make sure we don't push to much data at once.
Readable.readableHighWaterMark - Readable.readableLength
is the maximum amount of bytes we should push to this internal buffer.
So this means, since we want to read from two Readable
streams at the same time we need two Writable
streams to control the flow. To merge data we will need to buffer it ourselves, since there is (as far as I know) no internal buffer in the Writable
stream. So the Duplex stream will be the best option, because we want to handle buffering, writing and reading our selves.
Writing
So let's get to the code now. To control the state of multiple streams we will create a state interface. which looks as follows:
declare type StreamCallback = (error?: Error | null) => void;
interface MergingState {
callback: StreamCallback;
queue: BufferList;
highWaterMark: number;
size: number;
finalizing: boolean;
}
The callback
holds the last callback provided by either write or final (we'll get to final later). highWaterMark
indicates the maximum size for the our queue
and the size
is our current size of the queue. Lastly the finalizing
flag indicates that the current queue is the last queue. So once the queue is empty we're done reading the stream belonging to that state.
BufferList
is a copy of the internal Nodejs implementation used for the build in streams.
As mentioned before the writable handles the back pressure, so the generalized method for both our writables looks like the following:
/**
* Method to write to provided state if it can
*
* (Will unshift the bytes that cannot be written back to the source)
*
* @param src the readable source that writes the chunk
* @param chunk the chunk to be written
* @param encoding the chunk encoding, currently not used
* @param cb the streamCallback provided by the writing state
* @param state the state which should be written to
*/
private writeState(src: Readable, chunk: Buffer, encoding: string, cb: StreamCallback, state: MergingState): void {
this.mergeNextTick();
const bytesAvailable = state.highWaterMark - state.size;
if (chunk.length <= bytesAvailable) {
// save to write to our local buffer
state.queue.push(chunk);
state.size += chunk.length;
if (chunk.length === bytesAvailable) {
// our queue is full, so store our callback
this.stateCallbackAndSet(state, cb);
} else {
// we still have some space, so we can call the callback immediately
cb();
}
return;
}
if (bytesAvailable === 0) {
// no space available unshift entire chunk
src.unshift(chunk);
} else {
state.size += bytesAvailable;
const leftOver = Buffer.alloc(chunk.length - bytesAvailable);
chunk.copy(leftOver, 0, bytesAvailable);
// push amount of bytes available
state.queue.push(chunk.slice(0, bytesAvailable));
// unshift what we cannot fit in our queue
src.unshift(leftOver);
}
this.stateCallbackAndSet(state, cb);
}
First we check how much space is available to buffer. If there is enough space for our full chunk, we'll buffer it. If there is no space available, we will unshift the buffer to its readable source. If there is some space available, we'll only unshift what we cannot fit. If our buffer is full, we will store the callback that requests a new chunk. If there is space we will request our next chunk.
this.mergeNextTick()
is called because our state has changed and that it should be read in the next tick:
private mergeNextTick(): void {
if (!this.mergeSync) {
// make sure it is only called once per tick
// we don't want to call it multiple times
// since there will be nothing left to read the second time
this.mergeSync = true;
process.nextTick(() => this._read(this.readableHighWaterMark));
}
}
this.stateCallbackAndSet
is a helper function that will just call our last callback to make sure we'll not get in a state that makes our stream stop flowing. And will the new one provided.
/**
* Helper function to call the callback if it exists and set the new callback
* @param state the state which holds the callback
* @param cb the new callback to be set
*/
private stateCallbackAndSet(state: MergingState, cb: StreamCallback): void {
if (!state) {
return;
}
if (state.callback) {
const callback = state.callback;
// do callback next tick, such that we can't get stuck in a writing loop
process.nextTick(() => callback());
}
state.callback = cb;
}
Reading
Now onto the reading side this is the part where we handle selecting the correct stream.
First our function to read the state, which is pretty straight forward. it reads the amount of bytes it is able to read. It returns the amount of bytes written, which is useful information for our other function.
/**
* Method to read the provided state if it can
*
* @param size the number of bytes to consume
* @param state the state from which needs to be read
* @returns the amount of bytes read
*/
private readState(size: number, state: MergingState): number {
if (state.size === 0) {
// our queue is empty so we read 0 bytes
return 0;
}
let buffer = null;
if (state.size < size) {
buffer = state.queue.consume(state.size, false);
} else {
buffer = state.queue.consume(size, false);
}
this.push(buffer);
this.stateCallbackAndSet(state, null);
state.size -= buffer.length;
return buffer.length;
}
The doRead
method is where all the merging takes place: it fetches the nextMergingIndex. If the merging index is the END
then we can just read the writingState
until the end of the stream. If we are at the merging index, we read from the mergingState
. Otherwise we read as much from the writingState
until we reach the next merging index.
/**
* Method to read from the correct Queue
*
* The doRead method is called multiple times by the _read method until
* it is satisfied with the returned size, or until no more bytes can be read
*
* @param n the number of bytes that can be read until highWaterMark is hit
* @throws Errors when something goes wrong, so wrap this method in a try catch.
* @returns the number of bytes read from either buffer
*/
private doRead(n: number): number {
// first check all constants below 0,
// which is only Merge.END right now
const nextMergingIndex = this.getNextMergingIndex();
if (nextMergingIndex === Merge.END) {
// read writing state until the end
return this.readWritingState(n);
}
const bytesToNextIndex = nextMergingIndex - this.index;
if (bytesToNextIndex === 0) {
// We are at the merging index, thus should read merging queue
return this.readState(n, this.mergingState);
}
if (n <= bytesToNextIndex) {
// We are safe to read n bytes
return this.readWritingState(n);
}
// read the bytes until the next merging index
return this.readWritingState(bytesToNextIndex);
}
readWritingState
reads the state and updates the index:
/**
* Method to read from the writing state
*
* @param n maximum number of bytes to be read
* @returns number of bytes written.
*/
private readWritingState(n: number): number {
const bytesWritten = this.readState(n, this.writingState);
this.index += bytesWritten;
return bytesWritten;
}
Merging
For selecting our streams to merge we'll use a generator function. The generator function yields an index and a stream to merge at that index:
export interface MergingStream { index: number; stream: Readable; }
In doRead
getNextMergingIndex()
is called. This function returns the index of the next MergingStream
. If there is no next mergingStream the generator is called to fetch a new mergingStream. If there is no new merging stream, we'll just return END
.
/**
* Method to get the next merging index.
*
* Also fetches the next merging stream if merging stream is null
*
* @returns the next merging index, or Merge.END if there is no new mergingStream
* @throws Error when invalid MergingStream is returned by streamGenerator
*/
private getNextMergingIndex(): number {
if (!this.mergingStream) {
this.setNewMergeStream(this.streamGenerator.next().value);
if (!this.mergingStream) {
return Merge.END;
}
}
return this.mergingStream.index;
}
In the setNewMergeStream
we are creating a new Writable
which we can pipe our new merging stream into. For our Writable
We will need to handle the write callback for writing to our state and the final callback to handle the last chunk. We should also not forget to reset our state.
/**
* Method to set the new merging stream
*
* @throws Error when mergingStream has an index less than the current index
*/
private setNewMergeStream(mergingStream?: MergingStream): void {
if (this.mergingStream) {
throw new Error('There already is a merging stream');
}
// Set a new merging stream
this.mergingStream = mergingStream;
if (mergingStream == null || mergingStream.index === Merge.END) {
// set new state
this.mergingState = newMergingState(this.writableHighWaterMark);
// We're done, for now...
// mergingStream will be handled further once nextMainStream() is called
return;
}
if (mergingStream.index < this.index) {
throw new Error('Cannot merge at ' + mergingStream.index + ' because current index is ' + this.index);
}
// Create a new writable our new mergingStream can write to
this.mergeWriteStream = new Writable({
// Create a write callback for our new mergingStream
write: (chunk, encoding, cb) => this.writeMerge(mergingStream.stream, chunk, encoding, cb),
final: (cb: StreamCallback) => {
this.onMergeEnd(mergingStream.stream, cb);
},
});
// Create a new mergingState for our new merging stream
this.mergingState = newMergingState(this.mergeWriteStream.writableHighWaterMark);
// Pipe our new merging stream to our sink
mergingStream.stream.pipe(this.mergeWriteStream);
}
Finalizing
The last step in the process is to handle our final chunks. Such that we know when to end merging and can send an end chunk. In our main read loop we first read until our doRead()
method returns 0
twice in a row, or has filled our read buffer. Once that happens we end our read loop and check our states to see if they have finished.
public _read(size: number): void {
if (this.finished) {
// we've finished, there is nothing to left to read
return;
}
this.mergeSync = false;
let bytesRead = 0;
do {
const availableSpace = this.readableHighWaterMark - this.readableLength;
bytesRead = 0;
READ_LOOP: while (bytesRead < availableSpace && !this.finished) {
try {
const result = this.doRead(availableSpace - bytesRead);
if (result === 0) {
// either there is nothing in our buffers
// or our states are outdated (since they get updated in doRead)
break READ_LOOP;
}
bytesRead += result;
} catch (error) {
this.emit('error', error);
this.push(null);
this.finished = true;
}
}
} while (bytesRead > 0 && !this.finished);
this.handleFinished();
}
Then in our handleFinished()
we check our states.
private handleFinished(): void {
if (this.finished) {
// merge stream has finished, so nothing to check
return;
}
if (this.isStateFinished(this.mergingState)) {
this.stateCallbackAndSet(this.mergingState, null);
// set our mergingStream to null, to indicate we need a new one
// which will be fetched by getNextMergingIndex()
this.mergingStream = null;
this.mergeNextTick();
}
if (this.isStateFinished(this.writingState)) {
this.stateCallbackAndSet(this.writingState, null);
this.handleMainFinish(); // checks if there are still mergingStreams left, and sets finished flag
this.mergeNextTick();
}
}
The isStateFinished()
checks if our state has the finalizing flag set and if the queue size equals 0
/**
* Method to check if a specific state has completed
* @param state the state to check
* @returns true if the state has completed
*/
private isStateFinished(state: MergingState): boolean {
if (!state || !state.finalizing || state.size > 0) {
return false;
}
return true;
}
The finalized flag is set once our end callback is in the final callback for our merging Writable
stream. For our main stream we have to approach it a little differently, since we have little control over when our stream ends, because the readable calls the end of our writable by default. We want to remove this behavior such that we can decide when we finish our stream. This might cause some issues when other end listeners are set, but for most use cases this should be fine.
private onPipe(readable: Readable): void {
// prevent our stream from being closed prematurely and unpipe it instead
readable.removeAllListeners('end'); // Note: will cause issues if another end listener is set
readable.once('end', () => {
this.finalizeState(this.writingState);
readable.unpipe();
});
}
The finalizeState()
sets the flag and the callback to end the stream.
/**
* Method to put a state in finalizing mode
*
* Finalizing mode: the last chunk has been received, when size is 0
* the stream should be removed.
*
* @param state the state which should be put in finalizing mode
*
*/
private finalizeState(state: MergingState, cb?: StreamCallback): void {
state.finalizing = true;
this.stateCallbackAndSet(state, cb);
this.mergeNextTick();
}
And that is how you merge multiple streams in one single sink.
TL;DR: The complete code
This code has been fully tested with my jest test suite on multiple edge cases And has a few more features than explained in my code. Such as appending streams and merging into that appended stream. By providing Merge.END
as index.
Test result
You can see the tests I have ran here, if I forgot any, send me a message and I may write another test for it
MergeStream
✓ should throw an error when nextStream is not implemented (9ms)
✓ should throw an error when nextStream returns a stream with lower index (4ms)
✓ should reset index after new main stream (5ms)
✓ should write a single stream normally (50ms)
✓ should be able to merge a stream (2ms)
✓ should be able to append a stream on the end (1ms)
✓ should be able to merge large streams into a smaller stream (396ms)
✓ should be able to merge at the correct index (2ms)
Usage
const mergingStream = new Merge({
*nextStream(): IterableIterator<MergingStream> {
for (let i = 0; i < 10; i++) {
const stream = new Readable();
stream.push(i.toString());
stream.push(null);
yield {index: i * 2, stream};
}
},
});
const template = new Readable();
template.push(', , , , , , , , , ');
template.push(null);
template.pipe(mergingStream).pipe(getSink());
The result will of our sink would be
0, 1, 2, 3, 4, 5, 6, 7, 8, 9
Final Thoughts
This is not the most time efficient way of doing it, since we only manage one merging buffer at once. So there is a lot of waiting. For my use case that is fine. I care about it not eating up my memory and this solution works for me. But there is definitely some space for optimization. The complete code has some extra features that are not fully explained here, such as appending streams and merging into that appended stream. They have been explained with comments though.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With