I am trying to insert couple of millions records (with approximately 6 fields/columns) by receiving in requests from clients 10,000 records per bulk insert attempt (using sequelize
.js and bulkCreate()
)
This obviously was a bad idea, so I tried looking into node-pg-copy-streams
However, I do not want to initiate a change on the client side, where a json array is sent as such
# python
data = [
{
"column a":"a values",
"column b":"b values",
},
...
# 10,000 items
...
]
request.post(data=json.dumps(data), url=url)
On the Server side in nodejs, how would I stream the received request.body
in the following skeleton ?
.post(function(req, res){
// old sequelize code
/* table5.bulkCreate(
req.body, {raw:true}
).then(function(){
return table5.findAll();
}).then(function(result){
res.json(result.count);
});*/
// new pg-copy-streams code
pg.connect(function(err, client, done) {
var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
// My question is here, how would I stream or pipe the request body ?
// ?.on('error', done);
// ?.pipe(stream).on('finish', done).on('error', done);
});
});
Use node-postgres Module The node-postgres module is an npm package that allows you to connect to and interact with a PostgreSQL database. There are two options you can use to connect Node with PostgreSQL using the node-postgres module: a single client or a connection pool.
Streams are one of the fundamental concepts that power Node. js applications. They are data-handling method and are used to read or write input into output sequentially. Streams are a way to handle reading/writing files, network communications, or any kind of end-to-end information exchange in an efficient way.
Here's how I solved my problem,
First a function to convert my req.body dict to a TSV (not a part of the initial problem)
/**
* Converts a dictionary and set of keys to a Tab Separated Value blob of text
* @param {Dictionary object} dict
* @param {Array of Keys} keys
* @return {Concatenated Tab Separated Values} String
*/
function convertDictsToTSV(dicts, keys){
// ...
}
Second the rest of my original .post function
.post(function(req, res){
// ...
/* requires 'stream' as
* var stream = require('stream');
* var copyFrom = require('pg-copy-streams').from;
*/
var read_stream_string = new stream.Readable();
read_stream_string.read = function noop() {};
var keys = [...]; // set of dictionary keys to extract from req.body
read_stream_string.push(convertDictsToTSV(req.body, keys));
read_stream_string.push(null);
pg.connect(connectionString, function(err, client, done) {
// ...
// error handling
// ...
var copy_string = 'Copy tablename (' + keys.join(',') + ') FROM STDIN'
var pg_copy_stream = client.query( copyFrom( copy_string ) );
read_stream_string.pipe(pg_copy_stream).on('finish', function(finished){
// handle finished and done appropriately
}).on('error', function(errored){
// handle errored and done appropriately
});
});
pg.end();
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With