Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Lambda - Import CSV from S3 to RDS MySQL

I have a Lambda function which imports specific CSV file from S3 to MySQL. However, the file size of CSV is around 1 GB. When I run this code, it doesn't process and time out.

//s3 to rds
const fs = require("fs");
const AWS = require('aws-sdk');
var mysql = require('mysql');
var config = require('./config.json');
const s3 = new AWS.S3({
  accessKeyId: 'XXXXXXXXXXXXXXX',
  secretAccessKey: 'XXXXXXXXXXXXXXXXXXXXXXXXXXxx'
});
var filePath = `localfilepath`;

var pool = mysql.createPool({
  host: config.dbhost,
  user: config.dbuser,
  password: config.dbpassword,
  database: config.dbname
});
pool.getConnection((err, connection) => {
  if (err) throw err;
  console.log("Connected!" + connection);

  var s3Params = {
    Bucket: '<your_bucket_name>',
    Key: '<your_key>'
  };
  s3.getObject(s3Params, function(err, result) {
    if (err) {
      throw new Error(err);
    } else {
      console.log('file stored successfully', result);
      fs.createWriteStream(filePath).write(result.Body);
      connection.query('TRUNCATE TABLE <table_name>', (err, result) => {
        if (err) {
         throw new Error(err);
        } else {
          console.log('table truncated');
          var query = `LOAD DATA LOCAL INFILE '<file_name>' INTO table <table_name> FIELDS TERMINATED BY ','  ENCLOSED BY '"' IGNORE 1 LINES `;
          connection.query(query, function(err, result) {
            if (err) throw err;
            console.log("Result: " + result);
            connection.release();
            fs.unlinkSync(filePath);
            console.log('file deleted');
          });
        }
      });
    }

  });
})

How can I make this working?

like image 570
dang Avatar asked Jan 24 '20 16:01

dang


3 Answers

According to this thread, they do expect to implement at some point however when is a best guess scenario.

AWS Lambda currently has a "hard limit" of 512mb of disk space in the /tmp directory (as stated here), so the fs.createWriteStream(filePath).write(result.Body); line should not work here due to the file being 1GB in size. The error would be something along the lines of "no space left on device" (from reviewing existing threads).

However, loading the file from S3 should work in this case. Lambda scales in both Memory & CPU sizes proportionally so it could be timing out due to lack of memory here (depending on what you have set). This link gives a good indicator as to what you need to set for this (in relation to what contents you're loading into memory vs disk space).

What I would suggest is splitting the stream into 512mb blocks (this package may help) at this stage and storing them in S3 separately, that way you can split this operation into 2 functions:

  1. Fetching data and splitting into separate s3 files (also truncating your table).
  2. Loading the CSV data back into your RDS from S3

(You can use Cloudwatch Events for this)

like image 67
Rohan Hadnum Avatar answered Nov 15 '22 16:11

Rohan Hadnum


You basically have 2 obstacles to overcome: 1) local storage on Lambda is only 512mb and 2) Lambda has an execution time limit of 15 minutes (which you have to explicitly configure on your function)

To solve problem 1, you can use S3 Select. It allows you to perform SQL queries on objects (CSVs and JSON files) in S3. Perform the S3 select query on your CSV file, and for every record you retrieve you can insert it in a queue and have other workers insert them to the database. You can insert directly to your RDS too, but it may be slower.

Here's a code sample:

const AWS = require('aws-sdk');
var fs = require('fs');

const S3 = new AWS.S3();

exports.handler = async (event, context) => {
    try {
        const query = "SELECT * FROM s3object s WHERE s.id > '0'";
        const bucket = 'my-bucket';
        const key = 'data.csv';

        const params = {
            Bucket: bucket,
            Key: key,
            ExpressionType: 'SQL',
            Expression: query,
            InputSerialization: { CSV: { FileHeaderInfo: 'USE' } },
            OutputSerialization: { CSV: {} }
        }

        const data = await getDataUsingS3Select(params);
        context.succeed(data);
    } catch (error) {
        context.fail(error);
    }
};

const getDataUsingS3Select = async (params) => {
    return new Promise((resolve, reject) => {
        S3.selectObjectContent(params, (err, data) => {
            if (err) { reject(err); }

            // This is a stream of events
            data.Payload.on('data', (event) => {
                // event, there is data inside it
                if (event.Records) {
                    // do what you want with payload: send to a queue or direct to db
                    console.log('Row:', event.Records.Payload.toString('utf8'));
                }
            }).on('end', () => {
                // we arrive here after processing everything
                resolve();
            });
        });
    })
}

If you're still going over the 15 minute time limit, that's problem 2. First add a limit clause in the SQL. Then you can create a "checkpoint" file in Lambda's /tmp directory. You can save the id of the last record that you processed there so when you re-run your Lambda function it can read that file, pick up the id and use that in your query's where clause, like:

select * from s3object s where s.id > '99' limit 50000
like image 26
aljo f Avatar answered Nov 15 '22 17:11

aljo f


If your main goal is to import data from a CSV file on S3 into RDS MySQL, check AWS Data Pipeline. It already has all defined resources you need to this common task in Load S3 Data into Amazon RDS MySQL Table, however it uses EC2 instance. But at the same time it's easier to scale and maintain solution.

like image 38
Yann Avatar answered Nov 15 '22 18:11

Yann