Created
July 4, 2016 18:42
-
-
Save tanzim/a0d04580698c9e2b8d27e1b182a48a0c to your computer and use it in GitHub Desktop.
Streaming data from MongoDB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| 'use strict'; | |
| const mongodb = require('mongodb'); | |
| // MongoDB connection defaults | |
| const OPTION_AUTO_RECONNECT = true; | |
| const OPTION_SOCKET_OPTIONS = { | |
| keepAlive: 1, | |
| connectTimeoutMS: 30000 | |
| }; | |
| const POOL_SIZE = 10; | |
| const DEFAULT_MONGODB_CONNECTION_OPTIONS = { | |
| server: { | |
| poolSize: POOL_SIZE, | |
| auto_reconnect: OPTION_AUTO_RECONNECT, | |
| socketOptions: OPTION_SOCKET_OPTIONS | |
| }, | |
| replset: { | |
| poolSize: POOL_SIZE, | |
| auto_reconnect: OPTION_AUTO_RECONNECT, | |
| socketOptions: OPTION_SOCKET_OPTIONS | |
| } | |
| }; | |
| function create(url, callback) { | |
| mongodb.MongoClient.connect(url, DEFAULT_MONGODB_CONNECTION_OPTIONS, callback); | |
| } | |
| module.exports = { | |
| create | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| 'use strict'; | |
| const connection = require('./connection'); | |
| // Processes a stream of documents | |
| function process(stream, callback) { | |
| stream.on('data', (document) => { | |
| // Do something with the the document | |
| }).on('end', () => { | |
| return callback(); | |
| }).on('error', (error) => { | |
| return callback(error); | |
| }); | |
| } | |
| connection.create(process.env.DB_URL, (error, db) => { | |
| if (error) { | |
| return console.error(`Error connecting to database. Reason: ${error}`); | |
| } | |
| const collectionToStream = 'accounts'; | |
| const accounts = db.collection(collectionToStream); | |
| const stream = collection.find({}).sort({ _id: 1 }).stream(); // The usual query syntax applies | |
| process(stream, (error) => { | |
| db.close(); | |
| if (error) { | |
| return console.error(`Error streaming collection. Reason: ${error}`); | |
| } | |
| return console.log('Done processing'); | |
| }); | |
| }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment