var _ = require('underscore'); var async = require('async'); var assert = require('assert'); var awssum = require('awssum'); var clone = require('clone'); var retry = require('retry'); var util = require('util'); var winston = require('winston'); var env = process.env; if (typeof process.env.ACCESS_KEY_ID === 'undefined') { winston.error('You must specify an AWS Access Key ID.'); process.exit(1); } if (typeof process.env.SECRET_ACCESS_KEY === 'undefined') { winston.error('You must specify an AWS Secret Access Key.'); process.exit(1); } if (typeof process.env.ACCOUNT_ID === 'undefined') { winston.error('You must specifiy an AWS Account ID.'); process.exit(1); } var accessKeyId = env.ACCESS_KEY_ID; var secretAccessKey = env.SECRET_ACCESS_KEY; var awsAccountId = env.ACCOUNT_ID; var amazon = awssum.load('amazon/amazon'); var DynamoDB = awssum.load('amazon/dynamodb').DynamoDB; var ddb = new DynamoDB({ 'accessKeyId': accessKeyId, 'secretAccessKey': secretAccessKey, 'region': amazon.US_EAST_1, }); // prefix attributes with S (for String) or N (for Number) var formatItem = function(item) { var itemToFormat = clone(item); _.map(itemToFormat, function(value, key, obj) { if (_.isString(value)) { obj[key] = { S: value }; } else if (_.isNumber(value)) { obj[key] = { N: value.toString(), }; } else { delete obj[key]; } }); return itemToFormat; }; var importItem = function(formattedItem, tableName, callback) { if (!_.isNull(formattedItem)) { pendingItems.RequestItems[tableName].push({ 'PutRequest' : { 'Item' : formattedItem, }, }); } if (pendingItems.RequestItems[tableName].length === 25 || (pendingItems.RequestItems[tableName].length > 0 && _.isNull(formattedItem))) { var operation = retry.operation({ retries: 5, factor: 3, minTimeout: 5000, maxTimeout: 60 * 1000, randomize: true, }); operation.attempt(function() { ddb.BatchWriteItem(pendingItems, function(err, response) { if (operation.retry(err)) { winston.warn(util.inspect(err)); winston.warn('Retrying to import the items...'); } else if (err) { winston.error('The import failed!'); callback(operation.mainError()); } else { if (_.isUndefined(response.Body.UnprocessedItems[tableName])) { pendingItems.RequestItems[tableName] = []; } else { // we are assuming that there are at most 24 unprocessed items pendingItems.RequestItems[tableName] = response.Body.UnprocessedItems[tableName]; } callback(null); } }); }); } else { callback(null); } }; var range2d = function(x, y) { var values = []; for (var i = 0; i < x; i++) { for (var j = 0; j < y; j++) { values.push([i, j]); } } return values; } // pending items are shared between calls to importItem var pendingItems = { 'RequestItems': { 'myData': [], }, }; var pendingFunctions = _.map(range2d(10, 100), function(i) { return function(callback) { winston.debug('Preparing ' + JSON.stringify(i) + '...'); var formattedItem = formatItem({appId: i[0], ts: i[1]}); importItem(formattedItem, 'myData', callback); }; }); async.series(pendingFunctions, function(err) { if (err) { winston.error(util.inspect(err)); process.exit(1); } // flush any remaining pending items async.whilst(function() { return pendingItems.RequestItems['myData'].length > 0; }, function(callback) { importItem(null, 'myData', callback); }, function(err) { if (err) { winston.error(util.inspect(err)); process.exit(1); } }); });