Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close in Node Pipeline stream

I am using the stream.pipeline functionality from Node to upload some data to S3. The basic idea I'm implementing is pulling files from a request and writing them to S3. I have one pipeline that pulls zip files and writes them to S3 successfully. However, I want my second pipeline to make the same request, but unzip and write the unzipped files to S3. The pipeline code looks like the following:

pipeline(request.get(...), s3Stream(zipFileWritePath)),
pipeline(request.get(...), new unzipper.Parse(), etl.map(entry => entry.pipe(s3Stream(createWritePath(writePath, entry)))))

The s3Stream function looks like so:

function s3Stream(file) {
    const pass = new stream.PassThrough()
    s3Store.upload(file, pass)
    return pass
}

The first pipeline works well, and is currently operating greatly in production. However, when adding the second pipeline, I get the following error:

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at Parse.onclose (internal/streams/end-of-stream.js:56:36)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at Parse.<anonymous> (/node_modules/unzipper/lib/parse.js:28:10)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at finishMaybe (_stream_writable.js:641:14)
at afterWrite (_stream_writable.js:481:3)
at onwrite (_stream_writable.js:471:7)
at /node_modules/unzipper/lib/PullStream.js:70:11
at afterWrite (_stream_writable.js:480:3)
at process._tickCallback (internal/process/next_tick.js:63:19)

Any idea what could be causing this or solutions to resolve this would be greatly appreciated!

like image 684
Jason Cromer Avatar asked May 02 '19 20:05

Jason Cromer


People also ask

What is stream chaining in node?

Chaining is a mechanism to connect the output of one stream to another stream and create a chain of multiple stream operations. It is normally used with piping operations. Now we'll use piping and chaining to first compress a file and then decompress the same.

What is PassThrough stream Nodejs?

PassThrough. This Stream is a trivial implementation of a Transform stream that simply passes the input bytes across to the output. This is mainly for testing and some other trivial use cases. Here is an example of Passthrough Stream where it is piping from readable stream to writable stream.

What is stream in Nodejs in what cases stream should be used?

A stream is an abstract interface for working with streaming data in Node.js. The node:stream module provides an API for implementing the stream interface. There are many stream objects provided by Node.js. For instance, a request to an HTTP server and process.stdout are both stream instances.


1 Answers

TL;DR

When using a pipeline you accept to consume the readable stream fully, you don't want anything stopping before the readable ends.

Deep dive

After some time working with those shenanigans here is some more usefull informations.

import stream from 'stream'

const s1 = new stream.PassThrough()
const s2 = new stream.PassThrough()
const s3 = new stream.PassThrough()

s1.on('end', () => console.log('end 1'))
s2.on('end', () => console.log('end 2'))
s3.on('end', () => console.log('end 3'))
s1.on('close', () => console.log('close 1'))
s2.on('close', () => console.log('close 2'))
s3.on('close', () => console.log('close 3'))

stream.pipeline(
    s1,
    s2,
    s3,
    async s => { for await (_ of s) { } },
    err => console.log('end', err)
)

now if i call s2.end() it will close all parents

end 2
close 2
end 3
close 3

pipeline is the equivalent of s3(s2(s1)))

but if i call s2.destroy() it print and destroy everything, this is your problem here a stream is destroyed before it ends normally, either an error or a return/break/throws in an asyncGenerator/asyncFunction

close 2
end Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at PassThrough.onclose (internal/streams/end-of-stream.js:117:38)
    at PassThrough.emit (events.js:327:22)
    at emitCloseNT (internal/streams/destroy.js:81:10)
    at processTicksAndRejections (internal/process/task_queues.js:83:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}
close 1
close 3

You must not let one of the streams without a way to catch their errors

stream.pipeline() leaves dangling event listeners on the streams after theallback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.

node source (14.4)

  const onclose = () => {
    if (readable && !readableEnded) {
      if (!isReadableEnded(stream))
        return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
    }
    if (writable && !writableFinished) {
      if (!isWritableFinished(stream))
        return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
    }
    callback.call(stream);
  };
like image 52
Sceat Avatar answered Oct 03 '22 06:10

Sceat