Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write a live data set to disk with async I/O?

I am new to developing in node.js (though relatively experienced at client-side javascript) and I'm running into lots of questions about good practices when dealing with asynchronous operations in node.js.

My specific issue (though I imagine this is a fairly general purpose topic) is that I have a node.js app (running on a Raspberry Pi) that is recording the readings from several temperature probes every 10 seconds to an in memory data structure. This works just fine. The data accumulates over time in memory and, as it accumulates and reaches a particular size threshold, the data is regularly aged (keeping only the last N days of data) to keep it from growing beyond a certain size. This temperature data is used to control some other appliances.

Then, I have a separate interval timer that writes this data out to disk every so often (to persist it if the process crashes). I'm using async node.js (fs.open(), fs.write() and fs.close()) disk IO to write the data out to disk.

And, because of the async nature of the disk IO, it occurs to me that the very data structure I'm trying to write to disk may get modified right in the middle of me writing it out to disk. That would potentially be a bad thing. If data is only appended to the data structure while writing out to disk, that won't actually cause a problem with the way I'm writing the data, but there are some circumstances where earlier data can be modified as new data is being recorded and that would really mess with the integrity of what I'm in the middle of writing to disk.

I can think of all sorts of somewhat ugly safeguards I could put in my code such as:

  1. Switch to synchronous IO to write the data to disk (don't really want to do that for server responsiveness reasons).
  2. Set a flag when I started writing data and don't record any new data while that flag is set (causes me to lose the recording of data during the write).
  3. More complicated versions of option 2 where I set the flag and when the flag is set, new data goes in a separate, temporary data structure that when the file IO is done is then merged with the real data (doable, but seems ugly).
  4. Take a snapshot copy of the original data and take your time to write that copy to disk knowing that nobody else will be modifying the copy. I don't want to do this because the data set is relatively large and I'm in a limited memory environment (Raspberry PI).

So, my question is what are design patterns for writing a large data set with async IO when other operations may want to modify that data during the async IO? Are there more general purpose ways of handling my issue than the specific work-arounds listed above?

like image 833
jfriend00 Avatar asked Sep 06 '14 00:09

jfriend00


Video Answer


1 Answers

Your problem is data synchronization. Traditionally this is solved with locks/mutexes, but javascript/node doesn't really have anything like that built-in.

So, how do we solve this in node? We use queues. Personally, I use the queue function from the async module.

Queues work by keeping a list of tasks that need to be executed and only execute those tasks, in the order they're added to the queue, once the previous task has completed (similar to your option 3).

queue animation

Note: The async module's queue method can actually run multiple tasks concurrently (like the animation above shows) but, since we're talking data synchronization here, we don't want that. Luckily we can tell it to just run one at a time.

In your particular situation what you'll want to do is setup a queue which can do two types of tasks:

  1. Modify your data structure
  2. Write your data structure to disk

Whenever you get new data from your temperature probes, add the task to your queue to modify your data structure with that new data. Then, whenever your interval timer fires, add the task to your queue that writes your data structure to disk.

Since the queue will only run one task at a time, in the order they're added to the queue, it guarentees that you'll never be modifying your in-memory data structure while you're writing data to disk.

A very simple implementation of that might look like:

var dataQueue = async.queue(function(task, callback) {
    if (task.type === "newData") {
        memoryStore.add(task.data); // modify your data structure however you do it now
        callback(); // let the queue know the task is done; you can pass an error here as usual if needed
    } else if (task.type === "writeData") {
        fs.writeFile(task.filename, JSON.stringify(memoryStore), function(err) {
            // error handling
            callback(err); // let the queue know the task is done
        })
    } else {
        callback(new Error("Unknown Task")); // just in case we get a task we don't know about
    }
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time

// call when you get new probe data
funcion addNewData(data) {
    dataQueue.push({task: "newData", data: data}, function(err) {
        // called when the task is complete; optional
    });
}

// write to disk every 5 minutes
setInterval(function() {
    dataQueue.push({task: "writeData", filename: "somefile.dat"}, function(err) {
        // called when the task is complete; optional
    });
}, 18000);

Also note that you can now add data to your data structure asynchronously. Say you add a new probe that fires off an event whenever its value changes. You can just addNewData(data) as you do with your existing probes and not worry about it conflicting with in-progress modifications or disk writes (this really comes in to play if you start writing to a database instead of an in-memory data store).


Update: A more elegant implementation using bind()

The idea is that you use bind() to bind arguments to a function and then push the new bound function that bind() returns on to the queue. That way you don't need to push some custom object on to the queue that it has to interpret; you can just give it a function to call, all setup with the correct arguments already. The only caveat is that the function has to take a callback as its last argument.

That should allow you to use all the existing functions you have (possibly with slight modifications) and just push them on to the queue when you need to make sure they don't run concurrently.

I threw this together to test the concept:

var async = require('async');

var dataQueue = async.queue(function(task, callback) {
    // task is just a function that takes a callback; call it
    task(callback); 
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time

function storeData(data, callback) {
    setTimeout(function() { // simulate async op
        console.log('store', data);
        callback(); // let the queue know the task is done
    }, 50);
}

function writeToDisk(filename, callback) {
    setTimeout(function() { // simulate async op
        console.log('write', filename);
        callback(); // let the queue know the task is done
    }, 250);
}

// store data every second
setInterval(function() {
    var data = {date: Date.now()}
    var boundStoreData = storeData.bind(null, data);
    dataQueue.push(boundStoreData, function(err) {
        console.log('store complete', data.date);
    })
}, 1000)

// write to disk every 2 seconds
setInterval(function() {
    var filename = Date.now() + ".dat"
    var boundWriteToDisk = writeToDisk.bind(null, filename);
    dataQueue.push(boundWriteToDisk, function(err) {
        console.log('write complete', filename);
    });
}, 2000);
like image 193
Mike S Avatar answered Oct 02 '22 09:10

Mike S