Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Find out if the stream chunk is the last

When writing NodeJS Transform stream in transform function how can I know if the chunk is the last one or there is no any new chunk coming.

_transform(chunk: any, encoding: string, callback: Function): void {
    // accumulating chunks here to buffer
    // so that I need to do some processing on the whole buffer
    // and I need to understand when to do that
}

So I need to know when the chunks coming to the Stream are over, to do some processing on the buffer composed of all the chunks and then push the processed data from the stream.

like image 400
Arman P. Avatar asked Sep 29 '17 18:09

Arman P.


3 Answers

In the _transform you can not figure out if there will be more data or not.

Depending on your use case you would either listen to the end event, or use _flush:

Stream: transform._flush(callback):

Custom Transform implementations may implement the transform._flush() method. This will be called when there is no more written data to be consumed, but before the 'end' event is emitted signaling the end of the Readable stream.

Within the transform._flush() implementation, the readable.push() method may be called zero or more times, as appropriate. The callback function must be called when the flush operation is complete.

like image 138
t.niese Avatar answered Sep 30 '22 17:09

t.niese


Example,

const COUNTER_NULL_SYMBOL = Symbol('COUNTER_NULL_SYMBOL');

const Counter = () => {
  let data = COUNTER_NULL_SYMBOL;
  let counter = 1;
  let first = true;

  const counterStream = new Transform({

    objectMode: true,
    decodeStrings: false,
    highWaterMark: 1,

    transform(chunk, encoding, callback) {
      if (data === COUNTER_NULL_SYMBOL) {
        data = chunk;
        return callback();
      } else {
        this.push({data, counter, last: false, first});
        first = false;

        counter++;
        data = chunk;

        return callback();
      }
    },
  });

  counterStream._flush = function (callback) {
    if (data === COUNTER_NULL_SYMBOL) {
      return callback();
    } else {
      this.push({data, counter, last: true, first});
      return callback();
    }
  };

  return counterStream;
};
like image 29
slava Avatar answered Sep 30 '22 17:09

slava


Using through2

fs.createReadStream('/tmp/important.dat')
  .pipe(through2(
    (chunk, enc, cb) => cb(null, chunk), // transform is a noop
    function (cb) { // FLUSH FUNCTION
      this.push('tacking on an extra buffer to the end');
      cb();
    }
  ))
  .pipe(fs.createWriteStream('/tmp/wut.txt'));   
like image 24
Gab Avatar answered Sep 30 '22 19:09

Gab