Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

fs.createWriteStream doesn't use back-pressure when writing data to a file, causing high memory usage

Problem

I'm trying to scan a drive directory (recursively walk all the paths) and write all the paths to a file (as it's finding them) using fs.createWriteStream in order to keep the memory usage low, but it doesn't work, the memory usage reaches 2GB during the scan.

Expected

I was expecting fs.createWriteStream to automatically handle memory/disk usage at all times, keeping memory usage at a minimum with back-pressure.

Code

const fs = require('fs')
const walkdir = require('walkdir')

let dir = 'C:/'

let options = {
  "max_depth": 0,
  "track_inodes": true,
  "return_object": false,
  "no_return": true,
}

const wstream = fs.createWriteStream("C:/Users/USERNAME/Desktop/paths.txt")

let walker = walkdir(dir, options)

walker.on('path', (path) => {
  wstream.write(path + '\n')
})

walker.on('end', (path) => {
  wstream.end()
})

Is it because I'm not using .pipe()? I tried creating a new Stream.Readable({read{}}) and then inside the .on('path' emitter pushing paths into it with readable.push(path) but that didn't really work.

UPDATE:

Method 2:

I tried the proposed in the answers drain method but it doesn't help much, it does reduce memory usage to 500mb (which is still too much for a stream) but it slows down the code significantly (from seconds to minutes)

Method 3:

I also tried using readdirp, it uses even less memory (~400mb) and is faster but I don't know how to pause it and use the drain method there to reduce the memory usage further:

const readdirp = require('readdirp')

let dir = 'C:/'
const wstream = fs.createWriteStream("C:/Users/USERNAME/Desktop/paths.txt")

readdirp(dir, {alwaysStat: false, type: 'files_directories'})
  .on('data', (entry) => {
    wstream.write(`${entry.fullPath}\n`)
  })

Method 4:

I also tried doing this operation with a custom recursive walker, and even though it uses only 30mb of memory, which is what I wanted, but it is like 10 times slower than the readdirp method and it is synchronous which is undesirable:

const fs = require('fs')
const path = require('path')

let dir = 'C:/'
function customRecursiveWalker(dir) {
  fs.readdirSync(dir).forEach(file => {
    let fullPath = path.join(dir, file)
    // Folders
    if (fs.lstatSync(fullPath).isDirectory()) {
      fs.appendFileSync("C:/Users/USERNAME/Desktop/paths.txt", `${fullPath}\n`)
      customRecursiveWalker(fullPath)
    } 
    // Files
    else {
      fs.appendFileSync("C:/Users/USERNAME/Desktop/paths.txt", `${fullPath}\n`)
    }  
  })
}
customRecursiveWalker(dir)
like image 257
Un1 Avatar asked May 08 '19 11:05

Un1


3 Answers

Preliminary observation: you've attempted to get the results you want using multiple approaches. One complication when comparing the approaches you used is that they do not all do the same work. If you run tests on file tree that contains only regular files, that tree does not contain mount points, you can probably compare the approaches fairly, but when you start adding mount points, symbolic links, etc, you may get different memory and time statistics merely due to the fact that one approach excludes files that another approach includes.

I've initially attempted a solution using readdirp, but unfortunately, but that library appears buggy to me. Running it on my system here, I got inconsistent results. One run would output 10Mb of data, another run with the same input parameters would output 22Mb, then I'd get another number, etc. I looked at the code and found that it does not respect the return value of push:

_push(entry) {
    if (this.readable) {
      this.push(entry);
    }
}

As per the documentation the push method may return a false value, in which case the Readable stream should stop producing data and wait until _read is called again. readdirp entirely ignores that part of the specification. It is crucial to pay attention to the return value of push to get proper handling of back-pressure. There are also other things that seemed questionable in that code.

So I abandoned that and worked on a proof of concept showing how it could be done. The crucial parts are:

  1. When the push method returns false it is imperative to stop adding data to the stream. Instead, we record where we were, and stop.

  2. We start again only when _read is called.

If you uncomment the console.log statements that print START and STOP. You'll see them printed out in succession on the console. We start, produce data until Node tells us to stop, and then we stop, until Node tells us to start again, and so on.

const stream = require("stream");
const fs = require("fs");
const { readdir, lstat } = fs.promises;
const path = require("path");

class Walk extends stream.Readable {
  constructor(root, maxDepth = Infinity) {
    super();

    this._maxDepth = maxDepth;

    // These fields allow us to remember where we were when we have to pause our
    // work.

    // The path of the directory to process when we resume processing, and the
    // depth of this directory.
    this._curdir = [root, 1];

    // The directories still to process.
    this._dirs = [this._curdir];

    // The list of files to process when we resume processing.
    this._files = [];

    // The location in `this._files` were to continue processing when we resume.
    this._ix = 0;

    // A flag recording whether or not the fetching of files is currently going
    // on.
    this._started = false;
  }

  async _fetch() {
    // Recall where we were by loading the state in local variables.
    let files = this._files;
    let dirs = this._dirs;
    let [dir, depth] = this._curdir;
    let ix = this._ix;

    while (true) {
      // If we've gone past the end of the files we were processing, then
      // just forget about them. This simplifies the code that follows a bit.
      if (ix >= files.length) {
        ix = 0;
        files = [];
      }

      // Read directories until we have files to process.
      while (!files.length) {
        // We've read everything, end the stream.
        if (dirs.length === 0) {
          // This is how the stream API requires us to indicate the stream has
          // ended.
          this.push(null);

          // We're no longer running.
          this._started = false;
          return;
        }

        // Here, we get the next directory to process and get the list of
        // files in it.
        [dir, depth] = dirs.pop();

        try {
          files = await readdir(dir, { withFileTypes: true });
        }
        catch (ex) {
          // This is a proof-of-concept. In a real application, you should
          // determine what exceptions you want to ignore (e.g. EPERM).
        }
      }

      // Process each file.
      for (; ix < files.length; ++ix) {
        const dirent = files[ix];
        // Don't include in the results those files that are not directories,
        // files or symbolic links.
        if (!(dirent.isFile() || dirent.isDirectory() || dirent.isSymbolicLink())) {
          continue;
        }

        const fullPath = path.join(dir, dirent.name);
        if (dirent.isDirectory() & depth < this._maxDepth) {
          // Keep track that we need to walk this directory.
          dirs.push([fullPath, depth + 1]);
        }

        // Finally, we can put the data into the stream!
        if (!this.push(`${fullPath}\n`)) {
          // If the push returned false, we have to stop pushing results to the
          // stream until _read is called again, so we have to stop.

          // Uncomment this if you want to see when the stream stops.
          // console.log("STOP");

          // Record where we were in our processing.
          this._files = files;
          // The element at ix *has* been processed, so ix + 1.
          this._ix = ix + 1;
          this._curdir = [dir, depth];

          // We're stopping, so indicate that!
          this._started = false;
          return;
        }
      }
    }
  }

  async _read() {
    // Do not start the process that puts data on the stream over and over
    // again.
    if (this._started) {
      return;
    }

    this._started = true; // Yep, we've started.

    // Uncomment this if you want to see when the stream starts.
    // console.log("START");

    await this._fetch();
  }
}

// Change the paths to something that makes sense for you.
stream.pipeline(new Walk("/home/", 5),
                fs.createWriteStream("/tmp/paths3.txt"),
                (err) => console.log("ended with", err));

When I run the first attempt you made with walkdir here, I get the following statistics:

  • Elapsed time (wall clock): 59 sec
  • Maximum resident set size: 2.90 GB

When I use the code I've shown above:

  • Elapsed time (wall clock): 35 sec
  • Maximum resident set size: 0.1 GB

The file tree I use for the tests produces a file listing of 792 MB

like image 91
Louis Avatar answered Oct 12 '22 11:10

Louis


You could exploit the returned value from WritableStream.write(): it essentially states if you should continue to read or not. a WritableStream has an internal property that stores the threshold after which the buffer should be processed by the OS. The drain event will be emitted when the buffer has been flushed, i.e. you can call safely call WritableStream.write() without risking to excessively fill the buffer (which means the RAM). Luckily for you, walkdir let you control the process: you can emit pause(pause the walk. no more events will be emitted until resume) and resume(resume the walk) event from the walkdir object, pausing and resuming the writing process on you stream accordingly. Try with this:

let is_emitter_paused = false;
wstream.on('drain', (evt) => {
    if (is_emitter_paused) {
        walkdir.resume();
    }
});

walkdir.on('path', function(path, stat) {
    is_emitter_paused = !wstream.write(path + '\n');

    if (is_emitter_paused) {
        walkdir.pause();
    }
});
like image 43
Marco Luzzara Avatar answered Oct 12 '22 09:10

Marco Luzzara


Here's an implementation inspired by @Louis's answer. I think it's a bit easier to follow and in my minimal testing it performs about the same.

const fs = require('fs');
const path = require('path');
const stream = require('stream');

class Walker extends stream.Readable {
    constructor(root = process.cwd(), maxDepth = Infinity) {
        super();

        // Dirs to process
        this._dirs = [{ path: root, depth: 0 }];

        // Max traversal depth
        this._maxDepth = maxDepth;

        // Files to flush
        this._files = [];
    }

    _drain() {
        while (this._files.length > 0) {
            const file = this._files.pop();
            if (file.isFile() || file.isDirectory() || file.isSymbolicLink()) {
                const filePath = path.join(this._dir.path, file.name);
                if (file.isDirectory() && this._maxDepth > this._dir.depth) {
                    // Add directory to be walked at a later time
                    this._dirs.push({ path: filePath, depth: this._dir.depth + 1 });
                }
                if (!this.push(`${filePath}\n`)) {
                    // Hault walking
                    return false;
                }
            }
        }
        if (this._dirs.length === 0) {
            // Walking complete
            this.push(null);
            return false;
        }

        // Continue walking
        return true;
    }

    async _step() {
        try {
            this._dir = this._dirs.pop();
            this._files = await fs.promises.readdir(this._dir.path, { withFileTypes: true });
        } catch (e) {
            this.emit('error', e); // Uh oh...
        }
    }

    async _walk() {
        this.walking = true;
        while (this._drain()) {
            await this._step();
        }
        this.walking = false;
    }

    _read() {
        if (!this.walking) {
            this._walk();
        }
    }

}

stream.pipeline(new Walker('some/dir/path', 5),
    fs.createWriteStream('output.txt'),
    (err) => console.log('ended with', err));
like image 2
Jake Holzinger Avatar answered Oct 12 '22 10:10

Jake Holzinger