'use strict'; var _ = require('lodash'), async = require('async'), Bluebird = require('bluebird'), mongodb = Bluebird.promisifyAll(require('mongodb')), using = Bluebird.using; var concurrency = 3; var totalCount = 0; var cursorCount = {}; function processItem (item, cursor) { var id = cursor.id; if (!cursorCount[id]) cursorCount[id] = 0; cursorCount[id]++; totalCount++; // return Bluebird.delay(500); } function getConnectionAsync () { var url = 'mongodb://localhost:27017/migrate_test'; return mongodb.MongoClient.connectAsync(url) .disposer(function (connection) { connection.close(); }); } function loopCursor (cursor, iterator) { return Bluebird.fromCallback(function (callback) { async.during(function (callback) { cursor.hasNext(callback); }, function (callback) { cursor.next().then(function (item) { return iterator(item, cursor); }).then(callback); }, callback); }); } using(getConnectionAsync(), function (connection) { return connection.collectionAsync('filters') .then(function (collection) { return collection.count().then(function (count) { var promises = []; var pageSize = Math.floor(count / concurrency); _.times(concurrency, function (n) { var cursor = collection.find().skip(pageSize * n); cursor.id = _.uniqueId(); cursor.collection = collection; if (n !== concurrency - 1) { cursor.limit(pageSize); } promises.push(loopCursor(cursor, processItem)); }); return Bluebird.all(promises); }); }); }).then(function (data) { _.each(cursorCount, function (count, id) { console.log('Cursor %s Processed: %s', id, count); }); console.log('Total Processed: %s', totalCount); console.log('Migration Complete'); });