Created
May 12, 2018 06:58
-
-
Save slidenerd/f3019b03f5ea9023dfb365d95e60d161 to your computer and use it in GitHub Desktop.
Revisions
-
slidenerd created this gist
May 12, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,845 @@ const fs = require('fs'), fse = require('fs-extra'), MongoClient = require('mongodb').MongoClient, mongoose = require('mongoose'), ObjectID = require('mongodb').ObjectID, request = require('request') const includes = ["AUD", "BRL", "CAD", "CHF", "CLP", "CNY", "CZK", "DKK", "EUR", "GBP", "HKD", "HUF", "IDR", "ILS", "INR", "JPY", "KRW", "MXN", "MYR", "NOK", "NZD", "PHP", "PKR", "PLN", "RUB", "SEK", "SGD", "THB", "TRY", "TWD", "USD", "ZAR"] function normalizeDestinations(fiats = {}) { const keys = Object.keys(fiats.quotes) const normalizedFiats = [] for (let i = 0, length = keys.length; i < length; i++) { const key = keys[i].replace("USD", "") if (includes && includes.length) { if (includes.indexOf(key) >= 0) { normalizedFiats.push({ symbol: key, price: fiats.quotes[keys[i]] }) } } else { normalizedFiats.push({ symbol: key, price: fiats.quotes[keys[i]] }) } } return normalizedFiats } function getAlerts(sources = [], destinations = [], count = 10, sourceLimit = 10, destinationLimit = 10, userCount = 100, alertProbability = 0.1) { const getRandomPrice = (sourcePrice, destinationPrice, multiplier = 0.1) => { const product = sourcePrice * destinationPrice const gx = product + multiplier * product const lx = product - multiplier * product return +((lx + (gx - lx) * Math.random()).toFixed(2)) } const userIds = [] for (let i = 0; i < userCount; i++) { userIds.push(new ObjectID()) } const alerts = [], condensedAlerts = [] let uniqueAlerts = {} for (let i = 0; i < count; i++) { let sourceIndex do { sourceIndex = Math.floor(Math.random() * Math.min(sourceLimit, sources.length)) } while (sources[sourceIndex].price_usd === null || sources[sourceIndex].price_usd === undefined) let destinationIndex = Math.floor(Math.random() * Math.min(destinations.length, destinationLimit)) const id = sources[sourceIndex].id + ":" + destinations[destinationIndex].symbol const randomUserId = userIds[Math.floor(Math.random() * userIds.length)] const randomPrice = getRandomPrice(+sources[sourceIndex].price_usd, destinations[destinationIndex].price) const randomDirection = Math.floor(Math.random() * 2) === 0 ? false : true const item = { _id: new ObjectID(), //unique alert id 1: randomUserId, //unique user id 2: sources[sourceIndex].id, //from 3: destinations[destinationIndex].symbol, //to 4: id, 5: randomPrice, //price 6: randomDirection, //false = less than alert, true = greater than alert 7: 0, //type, 0 = price alert, percentage alert } alerts.push(item) const condensedItem = { _id: new ObjectID(), //unique alert id 1: randomUserId, //unique user id 2: id, 3: randomPrice, //price 4: randomDirection, //false = less than alert, true = greater than alert 5: 0, //type, 0 = price alert, percentage alert } condensedAlerts.push(condensedItem) if (!uniqueAlerts[id]) { uniqueAlerts[id] = { _id: id, 1: sources[sourceIndex].id, 2: destinations[destinationIndex].symbol, 3: +sources[sourceIndex].price_usd * destinations[destinationIndex].price, 4: 1 } } else { uniqueAlerts[id]['4'] += 1 } } const unique = [], queries = [], condensedUnique = [], keys = Object.keys(uniqueAlerts) for (let i = 0, length = keys.length; i < length; i++) { const key = keys[i] unique.push({ _id: key, 1: uniqueAlerts[key]['1'], 2: uniqueAlerts[key]['2'], 3: uniqueAlerts[key]['3'], 4: uniqueAlerts[key]['4'] }) condensedUnique.push({ _id: key, 1: uniqueAlerts[key]['3'], 2: uniqueAlerts[key]['4'] }) queries.push({ 2: { $eq: key }, $or: [ { 4: { $eq: false }, 3: { $gte: uniqueAlerts[key]['3'] } }, { 4: { $eq: true }, 3: { $lte: uniqueAlerts[key]['3'] } }, ] }) } const query = { $or: queries } return { alerts, unique, condensedAlerts, condensedUnique, query } } function getSourceOperations(sources = []) { const operations = [] for (let i = 0, length = sources.length; i < length; i++) { const source = sources[i] operations.push({ updateOne: { "filter": { _id: source.id }, "update": { _id: source.id, 1: +source.price_usd, 2: source.symbol }, "upsert": true } }) } return operations } function getDestinationOperations(destinations = []) { const operations = [] for (let i = 0, length = destinations.length; i < length; i++) { const key = destinations[i].symbol const item = { updateOne: { "filter": { _id: key }, "update": { 1: destinations[i].price }, "upsert": true } } if (includes && includes.length) { if (includes.indexOf(key) >= 0) { operations.push(item) } } else { operations.push(item) } } return operations } function upsertNative(collectionName, operations) { // Use connect method to connect to the Server MongoClient.connect('mongodb://localhost:27017', function (err, client) { const db = client.db('testalerts') // Insert a single document const t1 = new Date().getTime() db.collection(collectionName).bulkWrite(operations, (error, result) => { const t2 = new Date().getTime() if (error) { console.log(error, "upsertNative", collectionName, (t2 - t1) / 1000, "seconds") } else { console.log( "upsertNative", collectionName, (t2 - t1) / 1000, "seconds", "upserted", result.upsertedCount, "inserted", result.insertedCount, "deleted", result.deletedCount, "matched", result.matchedCount, "modified", result.modifiedCount ) client.close() } }) }) } function deleteInsert(collectionName, items) { MongoClient.connect('mongodb://localhost:27017', function (err, client) { const db = client.db('testalerts') const t1 = new Date().getTime() db.listCollections() .toArray() .then(collections => { const t2 = new Date().getTime() let found = false for (let i = 0, length = collections.length; i < length; i++) { if (collections[i].name === collectionName) { found = true break; } } if (found) { return db.collection(collectionName).drop() } }) .then(result => { if (result) { console.log("Deleted", collectionName, result) } return db.collection(collectionName).insertMany(items) }) .then(result => { console.log("Inserted", collectionName, result.insertedCount, result.result.n, result.result.ok) }) .catch(console.log) .then(() => { client.close() }) }) } function aggregate(collectionName, fileName, pipeline) { // Use connect method to connect to the Server MongoClient.connect('mongodb://localhost:27017', function (err, client) { const db = client.db('testalerts') const t1 = new Date().getTime() db.collection(collectionName).aggregate(pipeline, { allowUseDisk: true }).toArray((error, result) => { const t2 = new Date().getTime() if (error) { console.log(error, "Aggregation", collectionName, (t2 - t1) / 1000, "seconds") } else { console.log("Aggregation", collectionName, (t2 - t1) / 1000, "seconds returned", result.length, "documents") } fse.writeJson(fileName, result.slice(0, 20), { spaces: 4 }) client.close() }) }) } function refreshData(url, fileName, refresh = false) { if (refresh) { request(url, { encoding: 'utf-8' }, (error, response, body) => { if (error) { console.log(error) } else if (response.statusCode !== 200) { console.log(response.statusCode, response.statusMessage) } else { const coins = JSON.parse(body) console.log("Saving", coins.length, "coins", fileName) fse.writeJson(fileName, coins, {}) } }) } } function queryAlerts(collectionName, query) { MongoClient.connect('mongodb://localhost:27017', function (err, client) { const db = client.db('testalerts') const t1 = new Date().getTime() db.collection(collectionName).find(query).toArray((error, result) => { const t2 = new Date().getTime() if (error) { console.log(error, "query alerts", collectionName, (t2 - t1) / 1000, "seconds") } else { console.log("query alerts", collectionName, (t2 - t1) / 1000, "seconds returned", result.length, "documents") } client.close() }) }) } module.exports = { aggregate, deleteInsert, getAlerts, getDestinationOperations, getSourceOperations, normalizeDestinations, queryAlerts, upsertNative, } refreshData("https://api.coinmarketcap.com/v1/ticker/?limit=0", "cmc.json", false) refreshData("http://apilayer.net/api/live?access_key=f8b2013585398ee39a1ef56fc6caf458¤cies=&source=USD&format=2", "fiats.json", false) Index.js const fs = require('fs'), fse = require('fs-extra'), utils = require('./utils'); function aggregateAlerts() { const collectionName = "e1_alerts" const pipeline = [ { "$lookup": { "from": "e1_sources", "localField": "2", "foreignField": "_id", "as": "s" } }, { "$unwind": "$s" }, { "$project": { "_id": 0, "1": 1, "2": 1, "3": 1, "4": 1, "5": 1, "s": "$s.1" } }, { "$lookup": { "from": "e1_destinations", "localField": "3", "foreignField": "_id", "as": "d" } }, { "$unwind": "$d" }, { "$project": { "1": 1, "2": 1, "3": 1, "4": 1, "5": 1, "m": { "$multiply": ["$s", "$d.1"] }, "c": { "$gte": [ "$4", { "$multiply": ["$s", "$d.1"] } ] } } }, { "$match": { "$or": [ { "5": false, "c": true }, { "5": true, "c": false } ] } } ] return { collectionName, pipeline } } function aggregateUniqueAlertsNoPipelineMultipleLookups() { const collectionName = "e1_unique_alerts" const pipeline = [ { "$lookup": { "from": "e1_sources", "localField": "1", "foreignField": "_id", "as": "s" } }, { "$unwind": "$s" }, { "$project": { "_id": 0, "1": 1, "2": 1, "3": 1, "4": "$s.1" } }, { "$lookup": { "from": "e1_destinations", "localField": "2", "foreignField": "_id", "as": "d" } }, { "$unwind": "$d" }, { "$project": { "1": 1, "2": 1, "3": 1, "4": { "$multiply": ["$4", "$d.1"] } } }, { "$lookup": { "from": "e1_alerts", "localField": "3", "foreignField": "4", "as": "a" } }, { "$unwind": "$a" }, { "$project": { "_id": "$a._id", "1": "$a.1", "2": "$3", "3": "$a.5", "4": "$a.6", "5": "$a.7", "6": "$4" } }, { "$match": { "$expr": { "$or": [ { "$and": [ { "$eq": ["$4", false] }, { "$gte": ["$3", "$6"] } ] }, { "$and": [ { "$eq": ["$4", true] }, { "$lte": ["$3", "$6"] } ] } ] } } } ] return { collectionName, pipeline } } //all cryptos = 1525, all fiats = 32, unique alerts count = 42445 alerts count = 100000, time taken = 17.798 seconds //created indexes in the e1_alerts table for source, destination and source:destination combination function aggregateUniqueAlertsPipelineLastStage() { const collectionName = "e1_unique_alerts" const pipeline = [ { "$lookup": { "from": "e1_sources", "localField": "1", "foreignField": "_id", "as": "s" } }, { "$unwind": "$s" }, { "$project": { "_id": 0, "1": 1, "2": 1, "3": 1, "4": "$s.1" } }, { "$lookup": { "from": "e1_destinations", "localField": "2", "foreignField": "_id", "as": "d" } }, { "$unwind": "$d" }, { "$project": { "1": 1, "2": 1, "3": 1, "4": { "$multiply": ["$4", "$d.1"] } } }, { "$lookup": { "from": "e1_alerts", "let": { "pair": "$3", "price": "$4" }, "pipeline": [ { "$match": { "$expr": { "$and": [ { "$eq": ["$$pair", "$4"] }, { "$or": [ { "$and": [ { "$eq": ["$6", false] }, { "$gte": ["$5", "$$price"] } ] }, { "$and": [ { "$eq": ["$6", true] }, { "$lte": ["$5", "$$price"] } ] } ] } ] } } } ], "as": "a" } }, { "$unwind": "$a" }, { "$project": { "_id": "$a._id", "1": "$a.1", "2": "$3", "3": "$a.5", "4": "$a.6", "5": "$a.7", "6": "$4", } } ] return { collectionName, pipeline } } //all cryptos = 1525, all fiats = 32, unique alerts count = 42445 alerts count = 100000, time taken = 24.5 seconds //created indexes in the e1_alerts table for source, destination and source:destination combination, pipelines = bad function aggregateUniqueAlertsPipelineAll() { const collectionName = "e1_unique_alerts" const pipeline = [ { "$lookup": { "from": "e1_sources", "let": { source: "$1" }, "pipeline": [ { "$match": { "$expr": { "$eq": ["$_id", "$$source"] } } }, { "$project": { "1": 1, "_id": 0 } } ], "as": "s" } }, { "$unwind": "$s" }, { "$project": { "_id": 0, "1": 1, "2": 1, "3": "$s.1" } }, { "$lookup": { "from": "e1_destinations", "let": { destination: "$2" }, "pipeline": [ { "$match": { "$expr": { "$eq": ["$_id", "$$destination"] } } }, { "$project": { "_id": 0 } } ], "as": "d" } }, { "$unwind": "$d" }, { "$project": { "1": 1, "2": 1, "3": { "$multiply": ["$3", "$d.1"] } } }, { "$lookup": { "from": "e1_alerts", "let": { source: "$1", destination: "$2", price: "$3" }, "pipeline": [ { "$match": { "$expr": { "$and": [ { "$eq": ["$$source", "$2"] }, { "$eq": ["$$destination", "$3"] }, { "$or": [ { "$and": [ { "$eq": ["$5", false] }, { "$gte": ["$4", "$$price"] } ] }, { "$and": [ { "$eq": ["$5", true] }, { "$lte": ["$4", "$$price"] } ] } ] } ] } } } ], "as": "a" } }, { "$unwind": "$a" }, { "$project": { "_id": "$a._id", "1": "$a.1", "2": "$1", "3": "$2", "4": "$a.4", "5": "$a.5", "6": "$a.6", "7": "$3" } } ] return { collectionName, pipeline } } function aggregateUniqueAlertsNoLookups() { const collectionName = "e1_unique_alerts" const pipeline = [ { "$lookup": { "from": "e1_alerts", "localField": "_id", "foreignField": "4", "as": "a" } }, { "$unwind": "$a" }, { "$project": { "_id": "$a._id", "1": "$a.1", "2": "$a.4", "3": "$a.5", "4": "$a.6", "5": "$a.7", "6": "$3" } }, { "$match": { "$expr": { "$or": [ { "$and": [ { "$eq": ["$4", false] }, { "$gte": ["$3", "$6"] } ] }, { "$and": [ { "$eq": ["$4", true] }, { "$lte": ["$3", "$6"] } ] } ] } } } ] return { collectionName, pipeline } } function aggregateUniqueAlertsCondensedNoLookups() { const collectionName = "e1_unique_alerts_condensed" const pipeline = [ { "$lookup": { "from": "e1_alerts_condensed", "localField": "_id", "foreignField": "2", "as": "a" } }, { "$unwind": "$a" }, { "$project": { "_id": "$a._id", "1": "$a.1", "2": "$a.2", "3": "$a.3", "4": "$a.4", "5": "$a.5", "6": "$1" } }, { "$match": { "$expr": { "$or": [ { "$and": [ { "$eq": ["$4", false] }, { "$gte": ["$3", "$6"] } ] }, { "$and": [ { "$eq": ["$4", true] }, { "$lte": ["$3", "$6"] } ] } ] } } } ] return { collectionName, pipeline } } async function refreshDatabase(refresh = false) { if (refresh) { let fiats = await fse.readJson("fiats.json") fiats = utils.normalizeDestinations(fiats) const cryptos = await fse.readJson("cmc.json") const sourceOperations = utils.getSourceOperations(cryptos) utils.upsertNative("e1_sources", sourceOperations) const destinationOperations = utils.getDestinationOperations(fiats) utils.upsertNative("e1_destinations", destinationOperations) const { alerts, unique, condensedAlerts, condensedUnique, query } = utils.getAlerts(cryptos, fiats, 10000, 1, 1) // fse.writeJson("alerts.json", alerts) // fse.writeJson("unique.json", unique) // fse.writeJson("condensed_alerts.json", condensedAlerts) // fse.writeJson("condensed_unique.json", condensedUnique) fse.writeJson("query.json", query, { spaces: 4 }) utils.deleteInsert("e1_alerts", alerts) utils.deleteInsert("e1_unique_alerts", unique) utils.deleteInsert("e1_alerts_condensed", condensedAlerts) utils.deleteInsert("e1_unique_alerts_condensed", condensedUnique) setTimeout(() => { utils.queryAlerts("e1_alerts_condensed", query) }, 30000) } } (function () { refreshDatabase(true) const run1 = false const run2 = false if (run1) { const { collectionName, pipeline } = aggregateUniqueAlertsNoLookups() utils.aggregate(collectionName, "aggregation.json", pipeline) } if (run2) { const { collectionName, pipeline } = aggregateUniqueAlertsCondensedNoLookups() utils.aggregate(collectionName, "aggregation_condensed.json", pipeline) } })()