Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Meteor: How do I stream and parse a large file to an async Node function?

I'm using the job-collection package to do the following:

  1. Download a large file with a bunch of metadata about webpages
  2. Create a stream from the file metadata that is split by a regex using the NPM event-stream package
  3. Check if there is a match of the metadata in a collection (I've been attempting to stream each webpage's metadata to another function to do this)

The file is too large to buffer, so streaming is required. Here is a small file with a few examples of the metadata if you wish to try this.

Each job from the job-collection package is already inside an async function:

var request = Npm.require('request');
var zlib = Npm.require('zlib');
var EventStream = Meteor.npmRequire('event-stream');

function (job, callback) {

//This download is much too long to block
  request({url: job.fileURL, encoding: null}, function (error, response, body) {
    if (error) console.error('Error downloading File');
    if (response.statusCode !== 200) console.error(downloadResponse.statusCode, 'Status not 200');

    var responseEncoding = response.headers['content-type'];
    console.log('response encoding is %s', responseEncoding);
    if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {
      console.log('Received binary/octet-stream');
      var regexSplit = /WARC\/1\./;
      response.pipe(zlib.createGunzip()
              .pipe(EventStream.split(regexSplit))
              .pipe(EventStream.map(function (webpageMetaData) {
      /* Need parse the metaData or pass each webpageMetaData to function
       * This next function could block if it had to */
      searchPageMetaData(webpageMetaData); // pass each metadatum to this function to update a collection - this function can be synchronous
    }));
    } else {
      console.error('Wrong encoding');
    }
  });
}

function searchWebPageMetaData(metaData) {
  //  Parse JSON and search collection for match
}
  • Are there better ways to structure this? Am I on the right track?
  • Where to put Meteor.bindEnvironment? - do I I bind the environment for each time I pass to searchWebPageMetaData()? Do I need to expressly use fibers here?
  • The stream stops when running this if I run it to process.stdout. Am I supposed to put the stream into one of Meteor's wrap
  • I'm aware of Meteor.wrapAsync. Do I want to wrap the innermost searchWebPageMetaData() function in Meteor.wrapAsync? (think I'm answering this yes as I type)
  • Will the stream slow to compensate for the slowness of the DB calls? My guess is no but how do I deal with that?

I've spent quite a while learning about Meteor's wrapAsync, and bindEnvironment but having trouble bringing it all together and understanding where to use them.

SUPPLEMENT 1

Just to clarify, the steps are:

  1. Download file;
  2. Create stream;
  3. unzip it;
  4. split it into individual webPages - EventStream handles this
  5. send it to a function - don't need return values; this could be blocking, it's just some searching and database call

I was trying to do something like this, except the core code I need help with was in a function in a different file. The following code has most of @electric-jesus' answer in there.

   processJobs('parseWatFile', {
     concurrency: 1,
     cargo: 1,
     pollInterval: 1000,
     prefetch: 1
   }, function (job, callback) {

     if (job.data.watZipFileLink) {
       queue.pause();
       console.log('queue should be paused now');


       var watFileUrl = 'https://s3.amazonaws.com/ja-common-crawl/exampleWatFile.wat.gz';
       function searchPageMetaData(webpageMetaData, callback) {
         console.log(webpageMetaData);  // Would be nice to just get this function logging each webPageMetaData
         future.return(callback(webpageMetaData));  //I don't need this to return any value - do I have to return something?
     }

      if (!watFile)
        console.error('No watFile passed to downloadAndSearchWatFileForEntity ');

      var future = new Future(); // Doc Brown would be proud.

      if(typeof callback !== 'function') future.throw('callbacks are supposed to be functions.');

    request({url: watFile, encoding: null}, function (error, response, body) {

      if (error)                        future.throw('Error Downloading File');
      if (response.statusCode !== 200)  future.throw('Expected status 200, got ' + response.statusCode + '.');

      var responseEncoding = response.headers['content-type'];

    if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {

      var regexSplit = /WARC\/1\./;
      response.pipe(zlib.createGunzip()
        .pipe(EventStream.split(regexSplit))
        .pipe(EventStream.map(function (webpageMetaData) {
        searchPageMetaData(webpageMetaData, callback);
      })
    ));
    } else {
      future.throw('Wrong encoding');
    }
    });

    return future.wait();

    } else {
      console.log('No watZipFileLink for this job');
      job.log('ERROR: NO watZipFileLink from commonCrawlJob collection');
    }
      queue.resume();
      job.done;
      callback();
  }
like image 672
JohnAllen Avatar asked Nov 06 '14 06:11

JohnAllen


2 Answers

Interesting, looks alright. I've never worked with job-collection but it seems to be just a Mongo-driven task queue.. so I am assuming it works like a regular queue. I've always found for stuff with callback, I most certainly use the Future pattern. e.g:

var request = Npm.require('request');
var zlib = Npm.require('zlib');
var EventStream = Meteor.npmRequire('event-stream');

var Future = Npm.require('fibers/future');


var searchWebPageMetaData = function (metaData) {
  //  Parse JSON and search collection for match
  // make it return something
  var result = /droids/ig.test(metaData);
  return result;
}

var processJob = function (job, callback) {

  var future = new Future(); // Doc Brown would be proud.

  if(typeof callback !== 'function') future.throw("Oops, you forgot that callbacks are supposed to be functions.. not undefined or whatever.");

  //This download is much too long to block
  request({url: job.fileURL, encoding: null}, function (error, response, body) {

    if (error)                        future.throw("Error Downloading File");
    if (response.statusCode !== 200)  future.throw("Expected status 200, got " + downloadResponse.statusCode + ".");

    var responseEncoding = response.headers['content-type'];


    if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {

      var regexSplit = /WARC\/1\./;
      response.pipe(zlib.createGunzip()
              .pipe(EventStream.split(regexSplit))
              .pipe(EventStream.map(function (webpageMetaData) {
      /* Need parse the metaData or pass each webpageMetaData to function
       * This next function could block if it had to */

      // pass each metadatum to this function to update a collection - this function can be synchronous

      future.return(callback(webpageMetaData)); // this way, processJob returns whatever we find in the completed webpage, via callback.

    }));
    } else {
      future.throw('Wrong encoding');
    }
  });

  return future.wait();
}

Example usage:

so whenever you assign variables here:

var currentJob = processJob(myjob, searchWebPageMetaData);

and even with synchronous type obtainment/variable assignment, you get your async stuff done and transported just-in-time for you.

To answer your questions,

  • Where to put Meteor.bindEnvironment? - do I I bind the environment for each time I pass to searchWebPageMetaData()? Do I need to expressly use fibers here?

    not really, i believe the explicit use of fibers/future already take care of this.

  • The stream stops when running this if I run it to process.stdout. Am I supposed to put the stream into one of Meteor's wrap

    how do you mean? I vaguely remember process.stdout is blocking, that might be a cause. again, wrapping the result in a future should take care of this.

  • I'm aware of Meteor.wrapAsync. Do I want to wrap the innermost searchWebPageMetaData() function in Meteor.wrapAsync? (think I'm answering this yes as I type)

    Take a look at the Meteor.wrapAsync helper code. It's basically a future resolution applied, of course you can do this then again you can also explicitly use fibers/future on its own with no problem.

  • Will the stream slow to compensate for the slowness of the DB calls? My guess is no but how do I deal with that?

    Not really sure what you mean here.. but since we're trying to use asynchronous fibers, my guess is no as well. I've yet to see any slowness with the use of fibers. Probably only if there are multiple jobs launched (and concurrently running) at once, then you will have a performance issue in terms of memory usages. Keep the concurrent queue low as Fibers can be quite powerful in running stuff at the same time. You only have one core to process it all, that's a sad fact as node can't multi-core :(

like image 199
Seth Malaki Avatar answered Nov 15 '22 05:11

Seth Malaki


This is quite tricky if you want to handle all errors correctly. So one should ask themself, what to do if: you code throws an exception, or error event handler is called. You want that errors propagate correctly, that is, are thrown as an exception in the fiber calling streaming code. I implemented something like this for one of our job-collecton tasks, for extracting tar files.

First you need some helper functions:

bindWithFuture = (futures, mainFuture, fun, self) ->
  wrapped = (args...) ->
    future = new Future()

    if mainFuture
      future.resolve (error, value) ->
        # To resolve mainFuture early when an exception occurs
        mainFuture.throw error if error and not mainFuture.isResolved()
        # We ignore the value

    args.push future.resolver()
    try
      futures.list.push future
      fun.apply (self or @), args
    catch error
      future.throw error

    # This waiting does not really do much because we are
    # probably in a new fiber created by Meteor.bindEnvironment,
    # but we can still try to wait
    Future.wait future

  Meteor.bindEnvironment wrapped, null, self

wait = (futures) ->
  while futures.list.length
    Future.wait futures.list
    # Some elements could be added in meantime to futures,
    # so let's remove resolved ones and retry
    futures.list = _.reject futures.list, (f) ->
      if f.isResolved()
        # We get to throw an exception if there was an exception.
        # This should not really be needed because exception should
        # be already thrown through mainFuture and we should not even
        # get here, but let's check for every case.
        f.get()
        true # And to remove resolved

And then you can run something like:

mainFuture = new Future()

# To be able to override list with a new value in wait we wrap it in an object
futures =
  list: []

bindWithOnException = (f) =>
  Meteor.bindEnvironment f, (error) =>
    mainFuture.throw error unless mainFuture.isResolved()

onWebpageMetaData = (metaData, callback) =>
  return callback null if mainFuture.isResolved()

  # Do whatever you want here.
  # Call callback(null) when you finish.
  # Call callback(error) if there is an error.
  # If you want to call into a Meteor code inside some other callback for async code you use,
  # use bindWithOnException to wrap a function and stay inside a Meteor environment and fiber.

  MeteorCollection.insert
    metaData: metaData

  callback null

requestFuture = new Future()
request
  url: job.fileURL
  encoding: null
, 
  (error, response, body) ->
    return requestFuture.throw error if error
    return requestFuture.throw new Error "Expected status 200, got #{ response.statusCode }." unless response.statusCode is 200
    requestFuture.return response

response = requestFuture.wait()

responseEncoding = response.headers['content-type']
throw new Error "Wrong encoding" unless responseEncoding in ['application/octet-stream', 'binary/octet-stream']

regexSplit = /WARC\/1\./

response.pipe(
  zlib.createGunzip()
).pipe(
  EventStream.split regexSplit
).pipe(
  EventStream.map bindWithFuture futures, mainFuture, onWebpageMetaData
).on('end', =>
  # It could already be resolved by an exception from bindWithFuture or bindWithOnException
  mainFuture.return() unless mainFuture.isResolved()
).on('error', (error) =>
  # It could already be resolved by an exception from bindWithFuture or bindWithOnException
  mainFuture.throw error unless mainFuture.isResolved()
)

mainFuture.wait()
wait futures
like image 21
Mitar Avatar answered Nov 15 '22 05:11

Mitar