Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I stream a JSON Array from NodeJS to postgres

I am trying to insert couple of millions records (with approximately 6 fields/columns) by receiving in requests from clients 10,000 records per bulk insert attempt (using sequelize.js and bulkCreate())

This obviously was a bad idea, so I tried looking into node-pg-copy-streams

However, I do not want to initiate a change on the client side, where a json array is sent as such

# python
data = [
    {
     "column a":"a values",
     "column b":"b values",
    },
    ...
    # 10,000 items
    ...
]
request.post(data=json.dumps(data), url=url)

On the Server side in nodejs, how would I stream the received request.body in the following skeleton ?

.post(function(req, res){

    // old sequelize code
    /* table5.bulkCreate(
        req.body, {raw:true}
    ).then(function(){
        return table5.findAll();
    }).then(function(result){
        res.json(result.count);
    });*/

    // new pg-copy-streams code
    pg.connect(function(err, client, done) {
    var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
    // My question is here, how would I stream or pipe the request body ?
    // ?.on('error', done);
    // ?.pipe(stream).on('finish', done).on('error', done);
    });
});
like image 252
Sri Avatar asked Jan 08 '16 22:01

Sri


People also ask

Can I use node js with PostgreSQL?

Use node-postgres Module The node-postgres module is an npm package that allows you to connect to and interact with a PostgreSQL database. There are two options you can use to connect Node with PostgreSQL using the node-postgres module: a single client or a connection pool.

What is HTTP streaming in node JS?

Streams are one of the fundamental concepts that power Node. js applications. They are data-handling method and are used to read or write input into output sequentially. Streams are a way to handle reading/writing files, network communications, or any kind of end-to-end information exchange in an efficient way.


1 Answers

Here's how I solved my problem,

First a function to convert my req.body dict to a TSV (not a part of the initial problem)

/**
 * Converts a dictionary and set of keys to a Tab Separated Value blob of text
 * @param {Dictionary object} dict
 * @param {Array of Keys} keys
 * @return {Concatenated Tab Separated Values} String
 */
function convertDictsToTSV(dicts, keys){
    // ...
}

Second the rest of my original .post function

.post(function(req, res){
    // ...
    /* requires 'stream' as 
     * var stream = require('stream');
     * var copyFrom = require('pg-copy-streams').from;
     */
    var read_stream_string = new stream.Readable();
    read_stream_string.read = function noop() {};
    var keys = [...]; // set of dictionary keys to extract from req.body 
    read_stream_string.push(convertDictsToTSV(req.body, keys));
    read_stream_string.push(null);
    pg.connect(connectionString, function(err, client, done) {
        // ...
        // error handling
        // ...
        var copy_string = 'Copy tablename (' + keys.join(',') + ') FROM STDIN'
        var pg_copy_stream = client.query( copyFrom( copy_string ) );
        read_stream_string.pipe(pg_copy_stream).on('finish', function(finished){
            // handle finished and done appropriately
        }).on('error', function(errored){
            // handle errored and done appropriately
        });
    });
    pg.end();
});
like image 88
Sri Avatar answered Oct 24 '22 20:10

Sri