I'm looking to do the following.
I've tried to do this using pg-query-stream to read the data out as a stream and then count those records up into batches e.g. 1000 at a time and once we reach the batch limit to then use pg-promise pgp.helpers.insert to insert the data.
The problem I have is that I can't quite figure out how to get the stream to pause properly for the insert to complete before continuing. Especially on the on.end()
The code I've tried is below
const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')
const batchInsertData = (tenant, stream, records, insertColumnSet, options = {}) => {
stream.pause()
const t0 = performance.now()
let query = tenant.db.$config.pgp.helpers.insert(records, insertColumnSet)
if (options.onConflictExpression) {
query += options.onConflictExpression
}
tenant.db.none(query)
.then(() => {
let t1 = performance.now()
console.log('Inserted ' + records.length + ' records done in ' + ((t1 - t0) / 1000) + ' (seconds).')
stream.resume()
})
.catch(error => {
throw error
})
}
module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {
try {
return new Promise((resolve, reject) => {
const query = new QueryStream(sql)
// Set options as required
options.batchSize = parseInt(options.batchSize) || 1000
options.onConflictExpression = options.onConflictExpression || null
let records = []
let batchNumber = 1
let recordCount = 0
let t0 = performance.now()
tenant.db.stream(query, (stream) => {
stream.on('data', (record) => {
const mappedRecord = recordMapper(record)
records.push(mappedRecord)
recordCount++
if (records.length === options.batchSize) {
batchInsertData(tenant, stream, records, columnSet, options)
records = []
console.log(`Batch ${batchNumber} done`)
batchNumber++
}
})
stream.on('end', () => {
// If any records are left that are not part of a batch insert here.
if (records.length !== 0) {
batchInsertData(tenant, stream, records, columnSet, options)
records = []
console.log(`Batch ${batchNumber} done`)
batchNumber++
console.log('Total Records: ' + recordCount)
let t1 = performance.now()
console.log('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
} else {
console.log('Total Records: ' + recordCount)
let t1 = performance.now()
console.log('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
}
})
stream.on('error', (error) => {
throw error
})
})
.then(data => {
resolve()
})
.catch(error => {
console.log('ERROR:', error)
reject(error)
})
})
} catch (err) {
throw err
}
}
I'm not not sure if the approach I'm trying is the best one. I've tried a few different things based on the documentation I can find around pg-promise and streams but had no joy.
Any help/advice is greatly appreciated.
Thanks
Paul
Attempt 2
Below is my second attempt trying to use getNextData and sequence as per the data imports page. Struggling to determine how to hook the stream into it to pull only batches of data at a time before inserting.
const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')
module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {
try {
// Set options as required
options.batchSize = parseInt(options.batchSize) || 1000
options.onConflictExpression = options.onConflictExpression || null
const query = new QueryStream(sql)
function getNextData(transaction, index) {
return new Promise(async (resolve, reject) => {
if (index < options.batchSize) {
let count = 1
await transaction.stream(query, async (stream) => {
let records = []
await tenant.db.$config.pgp.spex.stream.read.call(transaction, stream, function (streamIndex, streamData) {
stream.resume()
count++
console.log(count, streamIndex, streamData)
records.push(streamData[0])
if (records.length === options.batchSize) {
stream.pause()
resolve(records)
}
}, {readChunks: true})
})
}
resolve(null)
})
}
return tenant.db.tx('massive-insert', (transaction) => {
return transaction.sequence((index) => {
return getNextData(transaction, index)
.then((records) => {
if (records > 0) {
let query = tenant.db.$config.pgp.helpers.insert(records, columnSet)
if (options.onConflictExpression) {
query += options.onConflictExpression
}
const i0 = performance.now()
return transaction.none(query)
.then(() => {
let i1 = performance.now()
console.log('Inserted ' + records.length + ' records done in ' + ((i1 - i0) / 1000) + ' (seconds).')
})
}
})
})
})
} catch (err) {
throw err
}
}
I've got this working using a slightly different approach more focused around using streams directly, while still using pg-promise to deal with the DB side.
const BatchStream = require('batched-stream')
const { performance } = require('perf_hooks')
const { Transform, Writable } = require('stream')
module.exports = async (tenant, sql, columnSet, recordMapper, options = {}) => {
try {
// Set options as required
options.batchSize = parseInt(options.batchSize) || 1000
options.onConflictExpression = options.onConflictExpression || null
const query = new tenant.lib.QueryStream(sql)
const stream = tenant.db.client.query(query)
return new Promise((resolve, reject) => {
// We want to process this in batches
const batch = new BatchStream({size : options.batchSize, objectMode: true, strictMode: false})
// We use a write stream to insert the batch into the database
let insertDatabase = new Writable({
objectMode: true,
write(records, encoding, callback) {
(async () => {
try {
/*
If we have a record mapper then do it here prior to inserting the records.
This way is much quicker than doing it as a transform stream below by
about 10 seconds for 100,000 records
*/
if (recordMapper) {
records = records.map(record => recordMapper(record))
}
let query = tenant.lib.pgp.helpers.insert(records, columnSet)
if (options.onConflictExpression) {
query += options.onConflictExpression
}
const i0 = performance.now()
await tenant.db.none(query)
.then(() => {
let i1 = performance.now()
console.log('Inserted ' + records.length + ' records in ' + ((i1 - i0) / 1000) + ' (seconds).')
})
} catch(e) {
return callback(e)
}
callback()
})()
}
})
// Process the stream
const t0 = performance.now()
stream
// Break it down into batches
.pipe(batch)
// Insert those batches into the database
.pipe(insertDatabase)
// Once we get here we are done :)
.on('finish', () => {
const t1 = performance.now()
console.log('Finished insert in ' + ((t1 - t0) / 1000) + ' (seconds).')
resolve()
})
.on('error', (error) => {
reject(error)
})
})
} catch (err) {
throw err
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With