Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Exponential Backoff when BatchWrite to DynamoDB in NodeJS

I have the working code that reads CSV file from S3, groups every 25 rows in a BatchWriteItem DynamoDB request, and sends it. The BatchWrite would often return success with UnprocessedItems which contains partial items (not all 25). Subsequent resubmit may also fail (partial or complete). I wanted to implement exponential backoff when sending subsequent requests, but all the library I have found assuming the tasks are the same. In my case, the items may or may not be the same as the ones in the previous requests.

I am not very familiar with Node.js. Is there any library/way to implement a re-tried tasks with (different) context?

I am using AWS Lambda, so cannot use global variables.

Helper function writing to DDB with 1 retry:

// batchwrite to DDB
function batchWriteDDB(params) {
  dynamodb.batchWriteItem(params, function(err, data) {
    if (err) {
      console.error("Batchwrite failed: " + err, err.stack);
    } else {
      var unprocessed = data.UnprocessedItems;
      if (Object.keys(unprocessed).length === 0) {
        console.log("Processed all items.");
      } else {
        // some unprocessed items, do it again
        console.warn("Batchwrite did not to complete: " + util.inspect(unprocessed, { showHidden: false, depth: null }));
        console.log("Retry btachwriting...");
        var params2 = {};
        params2["RequestItems"] = data.UnprocessedItems;
        dynamodb.batchWriteItem(params2, function(error, data2) {
          if (err) {
            console.error("Retry failed: " + err, err.stack);
          } else {
            var unprocessed2 = data2.UnprocessedItems;
            if (Object.keys(unprocessed2).length === 0) {
              console.log("Retry processed all items.");
            } else {
              console.error("Failed AGAIN to complete: " + util.inspect(unprocessed2, { showHidden: false, depth: null }));
            }
          }
        });
      }
    }
  });
}
like image 604
derrdji Avatar asked Mar 20 '17 18:03

derrdji


People also ask

What is exponential backoff in DynamoDB?

In addition to simple retries, each AWS SDK implements an exponential backoff algorithm for better flow control. The concept behind exponential backoff is to use progressively longer waits between retries for consecutive error responses.

Does Batchwrite overwrite?

Chen's answer is correct; BatchWriteItem always overwrites.

When to use BatchWriteItem?

With BatchWriteItem , you can efficiently write or delete large amounts of data, such as from Amazon EMR, or copy data from another database into DynamoDB. In order to improve performance with these large-scale operations, BatchWriteItem does not behave in the same way as individual PutItem and DeleteItem calls would.

How do I delete multiple items in DynamoDB?

With the DynamoDB API, you use the DeleteItem action to delete data from a table, one item at a time. You must specify the item's primary key values. In addition to DeleteItem , Amazon DynamoDB supports a BatchWriteItem action for deleting multiple items at the same time.


2 Answers

AWS SDK supports the exponential back-off and retry mechanism. You can config that.

Set the base retry delay for all services to 300 ms

AWS.config.update({retryDelayOptions: {base: 300}});
// Delays with maxRetries = 3: 300, 600, 1200

Set a custom backoff function to provide delay values on retries

AWS.config.update({retryDelayOptions: {customBackoff: function(retryCount) {
  // returns delay in ms
}}});

Specifically configuring for AWS DynamoDB service:-

var dynamodb = (new AWS.DynamoDB({maxRetries: 5}))

Specifically configuring for AWS DynamoDB service:-

  • maxRetries = 5
  • Delay = 300ms

Config:-

var dynamodb = new AWS.DynamoDB({maxRetries: 5, retryDelayOptions: {base: 300} });

MaxRetry properties

like image 95
notionquest Avatar answered Sep 20 '22 13:09

notionquest


Bellow is a recursive way of handling unprocessed items writen to Dynamodb.

var batchWrite = function (items, table, callback) {
    var params = { RequestItems: {} };
    logger.info('batchWrite initial length of items: ' + items.length);
    table = table || 'Merchants';
    params['RequestItems'][table] = [];

    var attempt = 0;
    var batchCount = 0;
    while (items.length > 0) {

        // Pull off up to 25 items from the list
        for (var i = params['RequestItems'][table].length; i < 25; i++) {

            // Nothing else to add to the batch if the input list is empty
            if (items.length === 0) {
                break;
            }

            // Take a URL from the list and add a new PutRequest to the list of requests
            // targeted at the Image table
            item = items.pop();
            //logger.info('batchWrite length of items after pop: '+items.length);
            params['RequestItems'][table].push(item);
        }
        // Kick off this batch of requests
        logger.info("Calling BatchWriteItem with a new batch of "
            + params['RequestItems'][table].length + " items");
        logger.info("batchCount = " + batchCount + " set to execute in " + (10 * batchCount) + " seconds");
        logger.info("form of params sent to batchWrite: ");
        let dynamo = new AWS.DynamoDB({ apiVersion: '2012-08-10' });
        dynamo.batchWriteItem(params, doBatchWriteItem);

        // Initialize a new blank params variable
        params['RequestItems'][table] = [];
        batchCount++;
    }

    //A callback that repeatedly calls BatchWriteItem until all of the writes have completed
    function doBatchWriteItem(err, data) {
        batchCount--;
        if (err) {
            logger.info(err); // an error occurred
            if (batchCount === 0) {
                callback(err, data);
            }
        } else {
            console.dir(data);
            if (('UnprocessedItems' in data) && (table in data.UnprocessedItems)) {
                // More data. Call again with the unprocessed items.
                var params = {
                    RequestItems: data.UnprocessedItems
                };
                attempt++;
                batchCount++;
                logger.info("Calling BatchWriteItem again to retry "
                    + params['RequestItems'][table].length + "UnprocessedItems in " + (10 * attempt) + " seconds");
                logger.info("batchCount increased to " + batchCount);
                setTimeout(function () {
                    let dynamo = new AWS.DynamoDB({ apiVersion: '2012-08-10' });

                    dynamo.batchWriteItem(params, doBatchWriteItem);
                }, 10000 * attempt);
            } else {
                logger.info("BatchWriteItem processed all items in the batch, batchCount = " + batchCount);
                if (batchCount === 0) {
                    logger.info("batchWrite processed all batches");
                    callback(null, data);
                }
            }
        }
    }
}

Call the batchWrite function with the collection and parameters.

batchWrite(collection, 'your-table-name', (err, data) => {
   if (err) {
        logger.info('error ');
   }
   logger.info('success ');
});
like image 40
Desmond Avatar answered Sep 20 '22 13:09

Desmond