Skip to content

Instantly share code, notes, and snippets.

@tanzim
Created July 4, 2016 18:42
Show Gist options
  • Save tanzim/a0d04580698c9e2b8d27e1b182a48a0c to your computer and use it in GitHub Desktop.
Save tanzim/a0d04580698c9e2b8d27e1b182a48a0c to your computer and use it in GitHub Desktop.
Streaming data from MongoDB
'use strict';
const mongodb = require('mongodb');
// MongoDB connection defaults
const OPTION_AUTO_RECONNECT = true;
const OPTION_SOCKET_OPTIONS = {
keepAlive: 1,
connectTimeoutMS: 30000
};
const POOL_SIZE = 10;
const DEFAULT_MONGODB_CONNECTION_OPTIONS = {
server: {
poolSize: POOL_SIZE,
auto_reconnect: OPTION_AUTO_RECONNECT,
socketOptions: OPTION_SOCKET_OPTIONS
},
replset: {
poolSize: POOL_SIZE,
auto_reconnect: OPTION_AUTO_RECONNECT,
socketOptions: OPTION_SOCKET_OPTIONS
}
};
function create(url, callback) {
mongodb.MongoClient.connect(url, DEFAULT_MONGODB_CONNECTION_OPTIONS, callback);
}
module.exports = {
create
};
'use strict';
const connection = require('./connection');
// Processes a stream of documents
function process(stream, callback) {
stream.on('data', (document) => {
// Do something with the the document
}).on('end', () => {
return callback();
}).on('error', (error) => {
return callback(error);
});
}
connection.create(process.env.DB_URL, (error, db) => {
if (error) {
return console.error(`Error connecting to database. Reason: ${error}`);
}
const collectionToStream = 'accounts';
const accounts = db.collection(collectionToStream);
const stream = collection.find({}).sort({ _id: 1 }).stream(); // The usual query syntax applies
process(stream, (error) => {
db.close();
if (error) {
return console.error(`Error streaming collection. Reason: ${error}`);
}
return console.log('Done processing');
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment