const fs = require('fs'), fse = require('fs-extra'), MongoClient = require('mongodb').MongoClient, mongoose = require('mongoose'), ObjectID = require('mongodb').ObjectID, request = require('request') const includes = ["AUD", "BRL", "CAD", "CHF", "CLP", "CNY", "CZK", "DKK", "EUR", "GBP", "HKD", "HUF", "IDR", "ILS", "INR", "JPY", "KRW", "MXN", "MYR", "NOK", "NZD", "PHP", "PKR", "PLN", "RUB", "SEK", "SGD", "THB", "TRY", "TWD", "USD", "ZAR"] function normalizeDestinations(fiats = {}) { const keys = Object.keys(fiats.quotes) const normalizedFiats = [] for (let i = 0, length = keys.length; i < length; i++) { const key = keys[i].replace("USD", "") if (includes && includes.length) { if (includes.indexOf(key) >= 0) { normalizedFiats.push({ symbol: key, price: fiats.quotes[keys[i]] }) } } else { normalizedFiats.push({ symbol: key, price: fiats.quotes[keys[i]] }) } } return normalizedFiats } function getAlerts(sources = [], destinations = [], count = 10, sourceLimit = 10, destinationLimit = 10, userCount = 100, alertProbability = 0.1) { const getRandomPrice = (sourcePrice, destinationPrice, multiplier = 0.1) => { const product = sourcePrice * destinationPrice const gx = product + multiplier * product const lx = product - multiplier * product return +((lx + (gx - lx) * Math.random()).toFixed(2)) } const userIds = [] for (let i = 0; i < userCount; i++) { userIds.push(new ObjectID()) } const alerts = [], condensedAlerts = [] let uniqueAlerts = {} for (let i = 0; i < count; i++) { let sourceIndex do { sourceIndex = Math.floor(Math.random() * Math.min(sourceLimit, sources.length)) } while (sources[sourceIndex].price_usd === null || sources[sourceIndex].price_usd === undefined) let destinationIndex = Math.floor(Math.random() * Math.min(destinations.length, destinationLimit)) const id = sources[sourceIndex].id + ":" + destinations[destinationIndex].symbol const randomUserId = userIds[Math.floor(Math.random() * userIds.length)] const randomPrice = getRandomPrice(+sources[sourceIndex].price_usd, destinations[destinationIndex].price) const randomDirection = Math.floor(Math.random() * 2) === 0 ? false : true const item = { _id: new ObjectID(), //unique alert id 1: randomUserId, //unique user id 2: sources[sourceIndex].id, //from 3: destinations[destinationIndex].symbol, //to 4: id, 5: randomPrice, //price 6: randomDirection, //false = less than alert, true = greater than alert 7: 0, //type, 0 = price alert, percentage alert } alerts.push(item) const condensedItem = { _id: new ObjectID(), //unique alert id 1: randomUserId, //unique user id 2: id, 3: randomPrice, //price 4: randomDirection, //false = less than alert, true = greater than alert 5: 0, //type, 0 = price alert, percentage alert } condensedAlerts.push(condensedItem) if (!uniqueAlerts[id]) { uniqueAlerts[id] = { _id: id, 1: sources[sourceIndex].id, 2: destinations[destinationIndex].symbol, 3: +sources[sourceIndex].price_usd * destinations[destinationIndex].price, 4: 1 } } else { uniqueAlerts[id]['4'] += 1 } } const unique = [], queries = [], condensedUnique = [], keys = Object.keys(uniqueAlerts) for (let i = 0, length = keys.length; i < length; i++) { const key = keys[i] unique.push({ _id: key, 1: uniqueAlerts[key]['1'], 2: uniqueAlerts[key]['2'], 3: uniqueAlerts[key]['3'], 4: uniqueAlerts[key]['4'] }) condensedUnique.push({ _id: key, 1: uniqueAlerts[key]['3'], 2: uniqueAlerts[key]['4'] }) queries.push({ 2: { $eq: key }, $or: [ { 4: { $eq: false }, 3: { $gte: uniqueAlerts[key]['3'] } }, { 4: { $eq: true }, 3: { $lte: uniqueAlerts[key]['3'] } }, ] }) } const query = { $or: queries } return { alerts, unique, condensedAlerts, condensedUnique, query } } function getSourceOperations(sources = []) { const operations = [] for (let i = 0, length = sources.length; i < length; i++) { const source = sources[i] operations.push({ updateOne: { "filter": { _id: source.id }, "update": { _id: source.id, 1: +source.price_usd, 2: source.symbol }, "upsert": true } }) } return operations } function getDestinationOperations(destinations = []) { const operations = [] for (let i = 0, length = destinations.length; i < length; i++) { const key = destinations[i].symbol const item = { updateOne: { "filter": { _id: key }, "update": { 1: destinations[i].price }, "upsert": true } } if (includes && includes.length) { if (includes.indexOf(key) >= 0) { operations.push(item) } } else { operations.push(item) } } return operations } function upsertNative(collectionName, operations) { // Use connect method to connect to the Server MongoClient.connect('mongodb://localhost:27017', function (err, client) { const db = client.db('testalerts') // Insert a single document const t1 = new Date().getTime() db.collection(collectionName).bulkWrite(operations, (error, result) => { const t2 = new Date().getTime() if (error) { console.log(error, "upsertNative", collectionName, (t2 - t1) / 1000, "seconds") } else { console.log( "upsertNative", collectionName, (t2 - t1) / 1000, "seconds", "upserted", result.upsertedCount, "inserted", result.insertedCount, "deleted", result.deletedCount, "matched", result.matchedCount, "modified", result.modifiedCount ) client.close() } }) }) } function deleteInsert(collectionName, items) { MongoClient.connect('mongodb://localhost:27017', function (err, client) { const db = client.db('testalerts') const t1 = new Date().getTime() db.listCollections() .toArray() .then(collections => { const t2 = new Date().getTime() let found = false for (let i = 0, length = collections.length; i < length; i++) { if (collections[i].name === collectionName) { found = true break; } } if (found) { return db.collection(collectionName).drop() } }) .then(result => { if (result) { console.log("Deleted", collectionName, result) } return db.collection(collectionName).insertMany(items) }) .then(result => { console.log("Inserted", collectionName, result.insertedCount, result.result.n, result.result.ok) }) .catch(console.log) .then(() => { client.close() }) }) } function aggregate(collectionName, fileName, pipeline) { // Use connect method to connect to the Server MongoClient.connect('mongodb://localhost:27017', function (err, client) { const db = client.db('testalerts') const t1 = new Date().getTime() db.collection(collectionName).aggregate(pipeline, { allowUseDisk: true }).toArray((error, result) => { const t2 = new Date().getTime() if (error) { console.log(error, "Aggregation", collectionName, (t2 - t1) / 1000, "seconds") } else { console.log("Aggregation", collectionName, (t2 - t1) / 1000, "seconds returned", result.length, "documents") } fse.writeJson(fileName, result.slice(0, 20), { spaces: 4 }) client.close() }) }) } function refreshData(url, fileName, refresh = false) { if (refresh) { request(url, { encoding: 'utf-8' }, (error, response, body) => { if (error) { console.log(error) } else if (response.statusCode !== 200) { console.log(response.statusCode, response.statusMessage) } else { const coins = JSON.parse(body) console.log("Saving", coins.length, "coins", fileName) fse.writeJson(fileName, coins, {}) } }) } } function queryAlerts(collectionName, query) { MongoClient.connect('mongodb://localhost:27017', function (err, client) { const db = client.db('testalerts') const t1 = new Date().getTime() db.collection(collectionName).find(query).toArray((error, result) => { const t2 = new Date().getTime() if (error) { console.log(error, "query alerts", collectionName, (t2 - t1) / 1000, "seconds") } else { console.log("query alerts", collectionName, (t2 - t1) / 1000, "seconds returned", result.length, "documents") } client.close() }) }) } module.exports = { aggregate, deleteInsert, getAlerts, getDestinationOperations, getSourceOperations, normalizeDestinations, queryAlerts, upsertNative, } refreshData("https://api.coinmarketcap.com/v1/ticker/?limit=0", "cmc.json", false) refreshData("http://apilayer.net/api/live?access_key=f8b2013585398ee39a1ef56fc6caf458¤cies=&source=USD&format=2", "fiats.json", false) Index.js const fs = require('fs'), fse = require('fs-extra'), utils = require('./utils'); function aggregateAlerts() { const collectionName = "e1_alerts" const pipeline = [ { "$lookup": { "from": "e1_sources", "localField": "2", "foreignField": "_id", "as": "s" } }, { "$unwind": "$s" }, { "$project": { "_id": 0, "1": 1, "2": 1, "3": 1, "4": 1, "5": 1, "s": "$s.1" } }, { "$lookup": { "from": "e1_destinations", "localField": "3", "foreignField": "_id", "as": "d" } }, { "$unwind": "$d" }, { "$project": { "1": 1, "2": 1, "3": 1, "4": 1, "5": 1, "m": { "$multiply": ["$s", "$d.1"] }, "c": { "$gte": [ "$4", { "$multiply": ["$s", "$d.1"] } ] } } }, { "$match": { "$or": [ { "5": false, "c": true }, { "5": true, "c": false } ] } } ] return { collectionName, pipeline } } function aggregateUniqueAlertsNoPipelineMultipleLookups() { const collectionName = "e1_unique_alerts" const pipeline = [ { "$lookup": { "from": "e1_sources", "localField": "1", "foreignField": "_id", "as": "s" } }, { "$unwind": "$s" }, { "$project": { "_id": 0, "1": 1, "2": 1, "3": 1, "4": "$s.1" } }, { "$lookup": { "from": "e1_destinations", "localField": "2", "foreignField": "_id", "as": "d" } }, { "$unwind": "$d" }, { "$project": { "1": 1, "2": 1, "3": 1, "4": { "$multiply": ["$4", "$d.1"] } } }, { "$lookup": { "from": "e1_alerts", "localField": "3", "foreignField": "4", "as": "a" } }, { "$unwind": "$a" }, { "$project": { "_id": "$a._id", "1": "$a.1", "2": "$3", "3": "$a.5", "4": "$a.6", "5": "$a.7", "6": "$4" } }, { "$match": { "$expr": { "$or": [ { "$and": [ { "$eq": ["$4", false] }, { "$gte": ["$3", "$6"] } ] }, { "$and": [ { "$eq": ["$4", true] }, { "$lte": ["$3", "$6"] } ] } ] } } } ] return { collectionName, pipeline } } //all cryptos = 1525, all fiats = 32, unique alerts count = 42445 alerts count = 100000, time taken = 17.798 seconds //created indexes in the e1_alerts table for source, destination and source:destination combination function aggregateUniqueAlertsPipelineLastStage() { const collectionName = "e1_unique_alerts" const pipeline = [ { "$lookup": { "from": "e1_sources", "localField": "1", "foreignField": "_id", "as": "s" } }, { "$unwind": "$s" }, { "$project": { "_id": 0, "1": 1, "2": 1, "3": 1, "4": "$s.1" } }, { "$lookup": { "from": "e1_destinations", "localField": "2", "foreignField": "_id", "as": "d" } }, { "$unwind": "$d" }, { "$project": { "1": 1, "2": 1, "3": 1, "4": { "$multiply": ["$4", "$d.1"] } } }, { "$lookup": { "from": "e1_alerts", "let": { "pair": "$3", "price": "$4" }, "pipeline": [ { "$match": { "$expr": { "$and": [ { "$eq": ["$$pair", "$4"] }, { "$or": [ { "$and": [ { "$eq": ["$6", false] }, { "$gte": ["$5", "$$price"] } ] }, { "$and": [ { "$eq": ["$6", true] }, { "$lte": ["$5", "$$price"] } ] } ] } ] } } } ], "as": "a" } }, { "$unwind": "$a" }, { "$project": { "_id": "$a._id", "1": "$a.1", "2": "$3", "3": "$a.5", "4": "$a.6", "5": "$a.7", "6": "$4", } } ] return { collectionName, pipeline } } //all cryptos = 1525, all fiats = 32, unique alerts count = 42445 alerts count = 100000, time taken = 24.5 seconds //created indexes in the e1_alerts table for source, destination and source:destination combination, pipelines = bad function aggregateUniqueAlertsPipelineAll() { const collectionName = "e1_unique_alerts" const pipeline = [ { "$lookup": { "from": "e1_sources", "let": { source: "$1" }, "pipeline": [ { "$match": { "$expr": { "$eq": ["$_id", "$$source"] } } }, { "$project": { "1": 1, "_id": 0 } } ], "as": "s" } }, { "$unwind": "$s" }, { "$project": { "_id": 0, "1": 1, "2": 1, "3": "$s.1" } }, { "$lookup": { "from": "e1_destinations", "let": { destination: "$2" }, "pipeline": [ { "$match": { "$expr": { "$eq": ["$_id", "$$destination"] } } }, { "$project": { "_id": 0 } } ], "as": "d" } }, { "$unwind": "$d" }, { "$project": { "1": 1, "2": 1, "3": { "$multiply": ["$3", "$d.1"] } } }, { "$lookup": { "from": "e1_alerts", "let": { source: "$1", destination: "$2", price: "$3" }, "pipeline": [ { "$match": { "$expr": { "$and": [ { "$eq": ["$$source", "$2"] }, { "$eq": ["$$destination", "$3"] }, { "$or": [ { "$and": [ { "$eq": ["$5", false] }, { "$gte": ["$4", "$$price"] } ] }, { "$and": [ { "$eq": ["$5", true] }, { "$lte": ["$4", "$$price"] } ] } ] } ] } } } ], "as": "a" } }, { "$unwind": "$a" }, { "$project": { "_id": "$a._id", "1": "$a.1", "2": "$1", "3": "$2", "4": "$a.4", "5": "$a.5", "6": "$a.6", "7": "$3" } } ] return { collectionName, pipeline } } function aggregateUniqueAlertsNoLookups() { const collectionName = "e1_unique_alerts" const pipeline = [ { "$lookup": { "from": "e1_alerts", "localField": "_id", "foreignField": "4", "as": "a" } }, { "$unwind": "$a" }, { "$project": { "_id": "$a._id", "1": "$a.1", "2": "$a.4", "3": "$a.5", "4": "$a.6", "5": "$a.7", "6": "$3" } }, { "$match": { "$expr": { "$or": [ { "$and": [ { "$eq": ["$4", false] }, { "$gte": ["$3", "$6"] } ] }, { "$and": [ { "$eq": ["$4", true] }, { "$lte": ["$3", "$6"] } ] } ] } } } ] return { collectionName, pipeline } } function aggregateUniqueAlertsCondensedNoLookups() { const collectionName = "e1_unique_alerts_condensed" const pipeline = [ { "$lookup": { "from": "e1_alerts_condensed", "localField": "_id", "foreignField": "2", "as": "a" } }, { "$unwind": "$a" }, { "$project": { "_id": "$a._id", "1": "$a.1", "2": "$a.2", "3": "$a.3", "4": "$a.4", "5": "$a.5", "6": "$1" } }, { "$match": { "$expr": { "$or": [ { "$and": [ { "$eq": ["$4", false] }, { "$gte": ["$3", "$6"] } ] }, { "$and": [ { "$eq": ["$4", true] }, { "$lte": ["$3", "$6"] } ] } ] } } } ] return { collectionName, pipeline } } async function refreshDatabase(refresh = false) { if (refresh) { let fiats = await fse.readJson("fiats.json") fiats = utils.normalizeDestinations(fiats) const cryptos = await fse.readJson("cmc.json") const sourceOperations = utils.getSourceOperations(cryptos) utils.upsertNative("e1_sources", sourceOperations) const destinationOperations = utils.getDestinationOperations(fiats) utils.upsertNative("e1_destinations", destinationOperations) const { alerts, unique, condensedAlerts, condensedUnique, query } = utils.getAlerts(cryptos, fiats, 10000, 1, 1) // fse.writeJson("alerts.json", alerts) // fse.writeJson("unique.json", unique) // fse.writeJson("condensed_alerts.json", condensedAlerts) // fse.writeJson("condensed_unique.json", condensedUnique) fse.writeJson("query.json", query, { spaces: 4 }) utils.deleteInsert("e1_alerts", alerts) utils.deleteInsert("e1_unique_alerts", unique) utils.deleteInsert("e1_alerts_condensed", condensedAlerts) utils.deleteInsert("e1_unique_alerts_condensed", condensedUnique) setTimeout(() => { utils.queryAlerts("e1_alerts_condensed", query) }, 30000) } } (function () { refreshDatabase(true) const run1 = false const run2 = false if (run1) { const { collectionName, pipeline } = aggregateUniqueAlertsNoLookups() utils.aggregate(collectionName, "aggregation.json", pipeline) } if (run2) { const { collectionName, pipeline } = aggregateUniqueAlertsCondensedNoLookups() utils.aggregate(collectionName, "aggregation_condensed.json", pipeline) } })()