I have the working code that reads CSV file from S3, groups every 25 rows in a BatchWriteItem
DynamoDB request, and sends it. The BatchWrite
would often return success with UnprocessedItems
which contains partial items (not all 25). Subsequent resubmit may also fail (partial or complete). I wanted to implement exponential backoff when sending subsequent requests, but all the library I have found assuming the tasks are the same. In my case, the items may or may not be the same as the ones in the previous requests.
I am not very familiar with Node.js. Is there any library/way to implement a re-tried tasks with (different) context?
I am using AWS Lambda, so cannot use global variables.
Helper function writing to DDB with 1 retry:
// batchwrite to DDB
function batchWriteDDB(params) {
dynamodb.batchWriteItem(params, function(err, data) {
if (err) {
console.error("Batchwrite failed: " + err, err.stack);
} else {
var unprocessed = data.UnprocessedItems;
if (Object.keys(unprocessed).length === 0) {
console.log("Processed all items.");
} else {
// some unprocessed items, do it again
console.warn("Batchwrite did not to complete: " + util.inspect(unprocessed, { showHidden: false, depth: null }));
console.log("Retry btachwriting...");
var params2 = {};
params2["RequestItems"] = data.UnprocessedItems;
dynamodb.batchWriteItem(params2, function(error, data2) {
if (err) {
console.error("Retry failed: " + err, err.stack);
} else {
var unprocessed2 = data2.UnprocessedItems;
if (Object.keys(unprocessed2).length === 0) {
console.log("Retry processed all items.");
} else {
console.error("Failed AGAIN to complete: " + util.inspect(unprocessed2, { showHidden: false, depth: null }));
}
}
});
}
}
});
}
In addition to simple retries, each AWS SDK implements an exponential backoff algorithm for better flow control. The concept behind exponential backoff is to use progressively longer waits between retries for consecutive error responses.
Chen's answer is correct; BatchWriteItem always overwrites.
With BatchWriteItem , you can efficiently write or delete large amounts of data, such as from Amazon EMR, or copy data from another database into DynamoDB. In order to improve performance with these large-scale operations, BatchWriteItem does not behave in the same way as individual PutItem and DeleteItem calls would.
With the DynamoDB API, you use the DeleteItem action to delete data from a table, one item at a time. You must specify the item's primary key values. In addition to DeleteItem , Amazon DynamoDB supports a BatchWriteItem action for deleting multiple items at the same time.
AWS SDK supports the exponential back-off and retry mechanism. You can config that.
Set the base retry delay for all services to 300 ms
AWS.config.update({retryDelayOptions: {base: 300}});
// Delays with maxRetries = 3: 300, 600, 1200
Set a custom backoff function to provide delay values on retries
AWS.config.update({retryDelayOptions: {customBackoff: function(retryCount) {
// returns delay in ms
}}});
Specifically configuring for AWS DynamoDB service:-
var dynamodb = (new AWS.DynamoDB({maxRetries: 5}))
Specifically configuring for AWS DynamoDB service:-
Config:-
var dynamodb = new AWS.DynamoDB({maxRetries: 5, retryDelayOptions: {base: 300} });
MaxRetry properties
Bellow is a recursive way of handling unprocessed items writen to Dynamodb.
var batchWrite = function (items, table, callback) {
var params = { RequestItems: {} };
logger.info('batchWrite initial length of items: ' + items.length);
table = table || 'Merchants';
params['RequestItems'][table] = [];
var attempt = 0;
var batchCount = 0;
while (items.length > 0) {
// Pull off up to 25 items from the list
for (var i = params['RequestItems'][table].length; i < 25; i++) {
// Nothing else to add to the batch if the input list is empty
if (items.length === 0) {
break;
}
// Take a URL from the list and add a new PutRequest to the list of requests
// targeted at the Image table
item = items.pop();
//logger.info('batchWrite length of items after pop: '+items.length);
params['RequestItems'][table].push(item);
}
// Kick off this batch of requests
logger.info("Calling BatchWriteItem with a new batch of "
+ params['RequestItems'][table].length + " items");
logger.info("batchCount = " + batchCount + " set to execute in " + (10 * batchCount) + " seconds");
logger.info("form of params sent to batchWrite: ");
let dynamo = new AWS.DynamoDB({ apiVersion: '2012-08-10' });
dynamo.batchWriteItem(params, doBatchWriteItem);
// Initialize a new blank params variable
params['RequestItems'][table] = [];
batchCount++;
}
//A callback that repeatedly calls BatchWriteItem until all of the writes have completed
function doBatchWriteItem(err, data) {
batchCount--;
if (err) {
logger.info(err); // an error occurred
if (batchCount === 0) {
callback(err, data);
}
} else {
console.dir(data);
if (('UnprocessedItems' in data) && (table in data.UnprocessedItems)) {
// More data. Call again with the unprocessed items.
var params = {
RequestItems: data.UnprocessedItems
};
attempt++;
batchCount++;
logger.info("Calling BatchWriteItem again to retry "
+ params['RequestItems'][table].length + "UnprocessedItems in " + (10 * attempt) + " seconds");
logger.info("batchCount increased to " + batchCount);
setTimeout(function () {
let dynamo = new AWS.DynamoDB({ apiVersion: '2012-08-10' });
dynamo.batchWriteItem(params, doBatchWriteItem);
}, 10000 * attempt);
} else {
logger.info("BatchWriteItem processed all items in the batch, batchCount = " + batchCount);
if (batchCount === 0) {
logger.info("batchWrite processed all batches");
callback(null, data);
}
}
}
}
}
Call the batchWrite function with the collection and parameters.
batchWrite(collection, 'your-table-name', (err, data) => {
if (err) {
logger.info('error ');
}
logger.info('success ');
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With