I am trying to write an AWS Lambda that will take a tar.gz from a S3 bucket, inflate it and then unpack it whilst streaming the files back to another S3 bucket.
I have this code:
var AWS = require('aws-sdk');
var fs = require('fs');
var zlib = require('zlib');
var uuid = require('uuid/v4');
var tar = require('tar-stream')
var pack = tar.pack()
var s3 = new AWS.S3();
exports.handler = (event, context, callback) => {
var bucket = event.Records[0].s3.bucket.name;
var key = event.Records[0].s3.object.key;
var file = 'S3://' + bucket + '/' + key;
console.log(bucket)
console.log(key)
var readParams = {
Bucket: bucket,
Key: key
};
var dataStream = s3.getObject(readParams).createReadStream();
var extract = tar.extract()
extract.on('entry', function(header, stream, next) {
console.log(header.name)
var writeParams = {
Bucket: process.env.JOB_PROCESSING_BUCKET,
Key: uuid() + '-' + header.name,
Body: stream
};
s3.upload(writeParams).
on('httpUploadProgress', function(evt) {
console.log('Progress:', evt.loaded, '/', evt.total);
}).
send(function(err, data) {
if (err) console.log("An error occurred", err);
console.log("Uploaded the file at", data.Location);
});
stream.on('end', function() {
next() // ready for next entry
})
stream.resume() // just auto drain the stream
})
extract.on('finish', function() {
// all entries read
})
dataStream.pipe(zlib.createGunzip()).pipe(extract);
callback(null, 'Gunzip Lambda Function');
};
It pulls the file, sorts the gzipping out and then i can see each file being extracted on entry. The code then tries to steam the file to S3 which creates a 0kb file hangs around like its reading the stream then continues onto the next.
Why cant it seem to read/processes the stream body? Is there a better way of doing this?
Thanks
I don't know if it's the best solution but the following code works for me.
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const tar = require('tar-stream');
const zlib = require('zlib');
const stream = require('stream');
const uuid = require('uuid');
exports.get = (event, context) => {
var params = {
Bucket: event.Records[0].s3.bucket.name,
Key: event.Records[0].s3.object.key
};
var dataStream = s3.getObject(params).createReadStream();
var extract = tar.extract();
extract.on('entry', function(header, inputStream, next) {
inputStream.pipe(uploadFromStream(s3,header));
inputStream.on('end', function() {
next(); // ready for next entry
});
inputStream.resume(); // just auto drain the stream
});
extract.on('finish', function() {
// all entries read
});
dataStream.pipe(zlib.createGunzip()).pipe(extract);
}
function uploadFromStream(s3,header) {
var pass = new stream.PassThrough();
var writeParams = {
Bucket: process.env.JOB_PROCESSING_BUCKET,
Key: uuid.v1() + '-' + header.name,
Body: pass
};
s3.upload(writeParams, function(err, data) {
context.done(err, data);
});
return pass;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With