I am using the stream.pipeline
functionality from Node to upload some data to S3. The basic idea I'm implementing is pulling files from a request and writing them to S3. I have one pipeline
that pulls zip files and writes them to S3 successfully. However, I want my second pipeline
to make the same request, but unzip and write the unzipped files to S3. The pipeline code looks like the following:
pipeline(request.get(...), s3Stream(zipFileWritePath)),
pipeline(request.get(...), new unzipper.Parse(), etl.map(entry => entry.pipe(s3Stream(createWritePath(writePath, entry)))))
The s3Stream function looks like so:
function s3Stream(file) {
const pass = new stream.PassThrough()
s3Store.upload(file, pass)
return pass
}
The first pipeline
works well, and is currently operating greatly in production. However, when adding the second pipeline, I get the following error:
Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at Parse.onclose (internal/streams/end-of-stream.js:56:36)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at Parse.<anonymous> (/node_modules/unzipper/lib/parse.js:28:10)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at finishMaybe (_stream_writable.js:641:14)
at afterWrite (_stream_writable.js:481:3)
at onwrite (_stream_writable.js:471:7)
at /node_modules/unzipper/lib/PullStream.js:70:11
at afterWrite (_stream_writable.js:480:3)
at process._tickCallback (internal/process/next_tick.js:63:19)
Any idea what could be causing this or solutions to resolve this would be greatly appreciated!
Chaining is a mechanism to connect the output of one stream to another stream and create a chain of multiple stream operations. It is normally used with piping operations. Now we'll use piping and chaining to first compress a file and then decompress the same.
PassThrough. This Stream is a trivial implementation of a Transform stream that simply passes the input bytes across to the output. This is mainly for testing and some other trivial use cases. Here is an example of Passthrough Stream where it is piping from readable stream to writable stream.
A stream is an abstract interface for working with streaming data in Node.js. The node:stream module provides an API for implementing the stream interface. There are many stream objects provided by Node.js. For instance, a request to an HTTP server and process.stdout are both stream instances.
When using a pipeline you accept to consume the readable stream fully, you don't want anything stopping before the readable ends.
After some time working with those shenanigans here is some more usefull informations.
import stream from 'stream'
const s1 = new stream.PassThrough()
const s2 = new stream.PassThrough()
const s3 = new stream.PassThrough()
s1.on('end', () => console.log('end 1'))
s2.on('end', () => console.log('end 2'))
s3.on('end', () => console.log('end 3'))
s1.on('close', () => console.log('close 1'))
s2.on('close', () => console.log('close 2'))
s3.on('close', () => console.log('close 3'))
stream.pipeline(
s1,
s2,
s3,
async s => { for await (_ of s) { } },
err => console.log('end', err)
)
now if i call s2.end()
it will close all parents
end 2
close 2
end 3
close 3
pipeline is the equivalent of s3(s2(s1)))
but if i call s2.destroy()
it print and destroy everything, this is your problem here a stream is destroyed before it ends normally, either an error or a return/break/throws in an asyncGenerator/asyncFunction
close 2
end Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at PassThrough.onclose (internal/streams/end-of-stream.js:117:38)
at PassThrough.emit (events.js:327:22)
at emitCloseNT (internal/streams/destroy.js:81:10)
at processTicksAndRejections (internal/process/task_queues.js:83:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
close 1
close 3
You must not let one of the streams without a way to catch their errors
stream.pipeline() leaves dangling event listeners on the streams after theallback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.
node source (14.4)
const onclose = () => {
if (readable && !readableEnded) {
if (!isReadableEnded(stream))
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !writableFinished) {
if (!isWritableFinished(stream))
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
callback.call(stream);
};
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With