Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you push data to a readable stream from within a function?

I'm trying to achieve the following:

  • Function getPaths reads directory paths and pushes them into readable stream as it finds them
  • The readable stream keeps piping (streaming) incoming paths into the write stream as it receives the paths.

Code

const fs = require('fs')
const zlib = require('zlib')
const zip = zlib.createGzip()
const Stream = require('stream')


let wstream = fs.createWriteStream('C:/test/file.txt.gz') 
let readable = new Stream.Readable({
  objectMode: true,
  read(item) {
    this.push(item)
  }
})

readable.pipe(zip).pipe(wstream)
.on('finish', (err) => {
  console.log('done');
})

let walkdir = require('walkdir')
function getPaths(dir) {
  let walker = walkdir.sync(dir, {"max_depth": 0, "track_inodes": true}, (path, stat) => {
    readable.push(path)
    console.log('pushing a path to readable')
  }) 
}
getPaths("C:/")
console.log('getPaths() ran')
readable.push(null)  // indicates the end of the stream

Problem

The paths are not being compressed and written to the file as the getPaths function finds them and pushes them into the stream, it doesn't happen until it has found all of them. I know it's probably due to the process being synchronous but cannot figure out how to make it work.

I see the following output from the logs:

> // .gz file gets created with size of 0
> // Nothing happens for about 1 minute
> x(184206803) "pushing a path to readable"
> "getPaths() ran"
> // I see the data started being written into the file
> "Done"

UPDATE:

And if I do this asynchronously like this (or use the code from the answer below):

let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
  walker.on('path', (path, stat) => {
    readable.push(path)
  }) 
  walker.on('end', (path, stat) => {
    readable.push(null)
  }) 

  ...

  // readable.push(null) 

I get an error (I think, it throws that particular error when it doesn't receive expected data chunk after you're done pushing data into it. If you remove that last line from the code: readable.push(null), and try to run the code again it throws the same error):

TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be one of type
 string or Buffer. Received type number
like image 356
Un1 Avatar asked Nov 06 '22 20:11

Un1


1 Answers

Your code is very good and works fine. You just need to remove this.push(item) and set read function with empty body.

Here is a working snippet

const fs = require('fs')
const zlib = require('zlib')
const zip = zlib.createGzip()
const Stream = require('stream')


let wstream = fs.createWriteStream('C:/test/file.txt.gz') 
let readable = new Stream.Readable({
  objectMode: true,
  read() { }
})

readable.pipe(zip).pipe(wstream)
.on('finish', (err) => {
  console.log('done');
})

let walkdir = require('walkdir')
function getPaths(dir) {
  let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
  walker.on('path', (path, stat) => {
    readable.push(path)
  }) 
  walker.on('end', (path, stat) => {
    readable.push(null)
  }) 
}
getPaths("C:/")
console.log('getPaths() ran')

BTW, the right argument name is read(size). It represents the number of bytes to read

EDIT There is no need for the readable stream. You can write directly to zip.

const fs = require('fs');
const zlib = require('zlib');
const zip = zlib.createGzip();
const wstream = fs.createWriteStream('C:/test/file.txt.gz');

zip.pipe(wstream)
.on('finish', (err) => {
  console.log('done');
})

let walkdir = require('walkdir')
function getPaths(dir) {
  let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
  walker.on('path', (path, stat) => {
    zip.write(path);
  })
  walker.on('end', (path, stat) => {
    zip.end();
  })
}
getPaths("C:/")
console.log('getPaths() ran')
like image 91
Avraham Avatar answered Nov 10 '22 00:11

Avraham