Skip to content

Instantly share code, notes, and snippets.

@slidenerd
Created May 12, 2018 06:58
Show Gist options
  • Select an option

  • Save slidenerd/f3019b03f5ea9023dfb365d95e60d161 to your computer and use it in GitHub Desktop.

Select an option

Save slidenerd/f3019b03f5ea9023dfb365d95e60d161 to your computer and use it in GitHub Desktop.

Revisions

  1. slidenerd created this gist May 12, 2018.
    845 changes: 845 additions & 0 deletions mongodbtestalerts.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,845 @@
    const
    fs = require('fs'),
    fse = require('fs-extra'),
    MongoClient = require('mongodb').MongoClient,
    mongoose = require('mongoose'),
    ObjectID = require('mongodb').ObjectID,
    request = require('request')

    const includes = ["AUD", "BRL", "CAD", "CHF", "CLP", "CNY", "CZK", "DKK", "EUR", "GBP", "HKD", "HUF", "IDR", "ILS", "INR", "JPY", "KRW", "MXN", "MYR", "NOK", "NZD", "PHP", "PKR", "PLN", "RUB", "SEK", "SGD", "THB", "TRY", "TWD", "USD", "ZAR"]

    function normalizeDestinations(fiats = {}) {
    const keys = Object.keys(fiats.quotes)
    const normalizedFiats = []
    for (let i = 0, length = keys.length; i < length; i++) {
    const key = keys[i].replace("USD", "")
    if (includes && includes.length) {
    if (includes.indexOf(key) >= 0) {
    normalizedFiats.push({
    symbol: key,
    price: fiats.quotes[keys[i]]
    })
    }
    }
    else {
    normalizedFiats.push({
    symbol: key,
    price: fiats.quotes[keys[i]]
    })
    }
    }
    return normalizedFiats
    }

    function getAlerts(sources = [], destinations = [], count = 10, sourceLimit = 10, destinationLimit = 10, userCount = 100, alertProbability = 0.1) {

    const getRandomPrice = (sourcePrice, destinationPrice, multiplier = 0.1) => {
    const product = sourcePrice * destinationPrice
    const gx = product + multiplier * product
    const lx = product - multiplier * product
    return +((lx + (gx - lx) * Math.random()).toFixed(2))
    }

    const userIds = []
    for (let i = 0; i < userCount; i++) {
    userIds.push(new ObjectID())
    }

    const alerts = [], condensedAlerts = []
    let uniqueAlerts = {}
    for (let i = 0; i < count; i++) {
    let sourceIndex
    do {
    sourceIndex = Math.floor(Math.random() * Math.min(sourceLimit, sources.length))
    }
    while (sources[sourceIndex].price_usd === null || sources[sourceIndex].price_usd === undefined)

    let destinationIndex = Math.floor(Math.random() * Math.min(destinations.length, destinationLimit))
    const id = sources[sourceIndex].id + ":" + destinations[destinationIndex].symbol
    const randomUserId = userIds[Math.floor(Math.random() * userIds.length)]
    const randomPrice = getRandomPrice(+sources[sourceIndex].price_usd, destinations[destinationIndex].price)
    const randomDirection = Math.floor(Math.random() * 2) === 0 ? false : true
    const item = {
    _id: new ObjectID(), //unique alert id
    1: randomUserId, //unique user id
    2: sources[sourceIndex].id, //from
    3: destinations[destinationIndex].symbol, //to
    4: id,
    5: randomPrice, //price
    6: randomDirection, //false = less than alert, true = greater than alert
    7: 0, //type, 0 = price alert, percentage alert
    }
    alerts.push(item)

    const condensedItem = {
    _id: new ObjectID(), //unique alert id
    1: randomUserId, //unique user id
    2: id,
    3: randomPrice, //price
    4: randomDirection, //false = less than alert, true = greater than alert
    5: 0, //type, 0 = price alert, percentage alert
    }
    condensedAlerts.push(condensedItem)

    if (!uniqueAlerts[id]) {
    uniqueAlerts[id] = {
    _id: id,
    1: sources[sourceIndex].id,
    2: destinations[destinationIndex].symbol,
    3: +sources[sourceIndex].price_usd * destinations[destinationIndex].price,
    4: 1
    }
    }
    else {
    uniqueAlerts[id]['4'] += 1
    }
    }

    const unique = [], queries = [], condensedUnique = [], keys = Object.keys(uniqueAlerts)
    for (let i = 0, length = keys.length; i < length; i++) {
    const key = keys[i]
    unique.push({
    _id: key,
    1: uniqueAlerts[key]['1'],
    2: uniqueAlerts[key]['2'],
    3: uniqueAlerts[key]['3'],
    4: uniqueAlerts[key]['4']
    })
    condensedUnique.push({
    _id: key,
    1: uniqueAlerts[key]['3'],
    2: uniqueAlerts[key]['4']
    })
    queries.push({
    2: { $eq: key },
    $or: [
    { 4: { $eq: false }, 3: { $gte: uniqueAlerts[key]['3'] } },
    { 4: { $eq: true }, 3: { $lte: uniqueAlerts[key]['3'] } },
    ]
    })
    }

    const query = {
    $or: queries
    }
    return { alerts, unique, condensedAlerts, condensedUnique, query }
    }

    function getSourceOperations(sources = []) {

    const operations = []
    for (let i = 0, length = sources.length; i < length; i++) {
    const source = sources[i]
    operations.push({
    updateOne:
    {
    "filter": { _id: source.id },
    "update": {
    _id: source.id,
    1: +source.price_usd,
    2: source.symbol
    },
    "upsert": true
    }
    })

    }

    return operations
    }

    function getDestinationOperations(destinations = []) {
    const operations = []

    for (let i = 0, length = destinations.length; i < length; i++) {
    const key = destinations[i].symbol
    const item = {
    updateOne:
    {
    "filter": { _id: key },
    "update": {
    1: destinations[i].price
    },
    "upsert": true
    }
    }
    if (includes && includes.length) {
    if (includes.indexOf(key) >= 0) {
    operations.push(item)
    }
    }
    else {
    operations.push(item)
    }
    }

    return operations
    }

    function upsertNative(collectionName, operations) {

    // Use connect method to connect to the Server
    MongoClient.connect('mongodb://localhost:27017', function (err, client) {
    const db = client.db('testalerts')
    // Insert a single document
    const t1 = new Date().getTime()
    db.collection(collectionName).bulkWrite(operations, (error, result) => {
    const t2 = new Date().getTime()
    if (error) {
    console.log(error, "upsertNative", collectionName, (t2 - t1) / 1000, "seconds")
    }
    else {
    console.log(
    "upsertNative",
    collectionName,
    (t2 - t1) / 1000,
    "seconds",
    "upserted", result.upsertedCount,
    "inserted", result.insertedCount,
    "deleted", result.deletedCount,
    "matched", result.matchedCount,
    "modified", result.modifiedCount
    )
    client.close()
    }
    })
    })
    }

    function deleteInsert(collectionName, items) {
    MongoClient.connect('mongodb://localhost:27017', function (err, client) {
    const db = client.db('testalerts')
    const t1 = new Date().getTime()
    db.listCollections()
    .toArray()
    .then(collections => {
    const t2 = new Date().getTime()
    let found = false
    for (let i = 0, length = collections.length; i < length; i++) {
    if (collections[i].name === collectionName) {
    found = true
    break;
    }
    }

    if (found) {
    return db.collection(collectionName).drop()
    }
    })
    .then(result => {
    if (result) {
    console.log("Deleted", collectionName, result)
    }

    return db.collection(collectionName).insertMany(items)
    })
    .then(result => {
    console.log("Inserted", collectionName, result.insertedCount, result.result.n, result.result.ok)
    })
    .catch(console.log)
    .then(() => {
    client.close()
    })
    })
    }

    function aggregate(collectionName, fileName, pipeline) {
    // Use connect method to connect to the Server
    MongoClient.connect('mongodb://localhost:27017', function (err, client) {
    const db = client.db('testalerts')
    const t1 = new Date().getTime()
    db.collection(collectionName).aggregate(pipeline, { allowUseDisk: true }).toArray((error, result) => {
    const t2 = new Date().getTime()
    if (error) {
    console.log(error, "Aggregation", collectionName, (t2 - t1) / 1000, "seconds")
    }
    else {
    console.log("Aggregation", collectionName, (t2 - t1) / 1000, "seconds returned", result.length, "documents")
    }
    fse.writeJson(fileName, result.slice(0, 20), { spaces: 4 })
    client.close()
    })
    })
    }

    function refreshData(url, fileName, refresh = false) {
    if (refresh) {
    request(url, { encoding: 'utf-8' }, (error, response, body) => {
    if (error) {
    console.log(error)
    }
    else if (response.statusCode !== 200) {
    console.log(response.statusCode, response.statusMessage)
    }
    else {

    const coins = JSON.parse(body)
    console.log("Saving", coins.length, "coins", fileName)
    fse.writeJson(fileName, coins, {})
    }
    })
    }
    }

    function queryAlerts(collectionName, query) {
    MongoClient.connect('mongodb://localhost:27017', function (err, client) {
    const db = client.db('testalerts')
    const t1 = new Date().getTime()
    db.collection(collectionName).find(query).toArray((error, result) => {
    const t2 = new Date().getTime()
    if (error) {
    console.log(error, "query alerts", collectionName, (t2 - t1) / 1000, "seconds")
    }
    else {
    console.log("query alerts", collectionName, (t2 - t1) / 1000, "seconds returned", result.length, "documents")
    }
    client.close()
    })
    })
    }

    module.exports = {
    aggregate,
    deleteInsert,
    getAlerts,
    getDestinationOperations,
    getSourceOperations,
    normalizeDestinations,
    queryAlerts,
    upsertNative,
    }

    refreshData("https://api.coinmarketcap.com/v1/ticker/?limit=0", "cmc.json", false)
    refreshData("http://apilayer.net/api/live?access_key=f8b2013585398ee39a1ef56fc6caf458&currencies=&source=USD&format=2", "fiats.json", false)
    Index.js
    const
    fs = require('fs'),
    fse = require('fs-extra'),
    utils = require('./utils');

    function aggregateAlerts() {
    const collectionName = "e1_alerts"
    const pipeline = [
    {
    "$lookup": {
    "from": "e1_sources",
    "localField": "2",
    "foreignField": "_id",
    "as": "s"
    }
    },
    {
    "$unwind": "$s"
    },
    {
    "$project": {

    "_id": 0,
    "1": 1,
    "2": 1,
    "3": 1,
    "4": 1,
    "5": 1,
    "s": "$s.1"
    }
    },
    {
    "$lookup": {
    "from": "e1_destinations",
    "localField": "3",
    "foreignField": "_id",
    "as": "d"
    }
    },
    {
    "$unwind": "$d"
    },
    {
    "$project":
    {
    "1": 1,
    "2": 1,
    "3": 1,
    "4": 1,
    "5": 1,
    "m": {
    "$multiply": ["$s", "$d.1"]
    },
    "c":
    {
    "$gte":
    [
    "$4",
    {
    "$multiply": ["$s", "$d.1"]
    }
    ]

    }
    }
    },
    {
    "$match":
    {
    "$or": [
    {
    "5": false,
    "c": true
    },
    {
    "5": true,

    "c": false
    }
    ]
    }
    }
    ]

    return { collectionName, pipeline }
    }

    function aggregateUniqueAlertsNoPipelineMultipleLookups() {
    const collectionName = "e1_unique_alerts"
    const pipeline = [
    {
    "$lookup": {
    "from": "e1_sources",
    "localField": "1",
    "foreignField": "_id",
    "as": "s"
    }
    },
    { "$unwind": "$s" },
    {
    "$project": {

    "_id": 0,
    "1": 1,
    "2": 1,
    "3": 1,
    "4": "$s.1"
    }
    },
    {
    "$lookup": {
    "from": "e1_destinations",
    "localField": "2",
    "foreignField": "_id",
    "as": "d"
    }
    },
    { "$unwind": "$d" },
    { "$project": { "1": 1, "2": 1, "3": 1, "4": { "$multiply": ["$4", "$d.1"] } } },
    {
    "$lookup": {
    "from": "e1_alerts",
    "localField": "3",
    "foreignField": "4",
    "as": "a"
    }
    },
    { "$unwind": "$a" },
    {
    "$project": {
    "_id": "$a._id",
    "1": "$a.1",
    "2": "$3",
    "3": "$a.5",
    "4": "$a.6",
    "5": "$a.7",
    "6": "$4"
    }
    },
    {
    "$match": {
    "$expr": {
    "$or": [
    {
    "$and": [
    { "$eq": ["$4", false] },
    { "$gte": ["$3", "$6"] }
    ]
    },
    {
    "$and": [
    { "$eq": ["$4", true] },
    { "$lte": ["$3", "$6"] }
    ]
    }
    ]
    }
    }
    }
    ]
    return { collectionName, pipeline }
    }

    //all cryptos = 1525, all fiats = 32, unique alerts count = 42445 alerts count = 100000, time taken = 17.798 seconds
    //created indexes in the e1_alerts table for source, destination and source:destination combination
    function aggregateUniqueAlertsPipelineLastStage() {
    const collectionName = "e1_unique_alerts"
    const pipeline = [
    {
    "$lookup": {
    "from": "e1_sources",
    "localField": "1",
    "foreignField": "_id",
    "as": "s"
    }
    },
    { "$unwind": "$s" },
    {
    "$project": {

    "_id": 0,
    "1": 1,
    "2": 1,
    "3": 1,
    "4": "$s.1"
    }
    },
    {
    "$lookup": {
    "from": "e1_destinations",
    "localField": "2",
    "foreignField": "_id",
    "as": "d"
    }
    },
    { "$unwind": "$d" },
    { "$project": { "1": 1, "2": 1, "3": 1, "4": { "$multiply": ["$4", "$d.1"] } } },
    {
    "$lookup": {
    "from": "e1_alerts",
    "let": {
    "pair": "$3",
    "price": "$4"
    },
    "pipeline": [
    {
    "$match": {
    "$expr": {
    "$and": [
    {
    "$eq": ["$$pair", "$4"]
    },
    {
    "$or": [
    {
    "$and": [
    {
    "$eq": ["$6", false]
    },
    {
    "$gte": ["$5", "$$price"]
    }
    ]
    },
    {
    "$and": [
    {
    "$eq": ["$6", true]
    },
    {
    "$lte": ["$5", "$$price"]
    }
    ]
    }
    ]
    }
    ]
    }
    }
    }
    ],
    "as": "a"
    }
    },
    { "$unwind": "$a" },
    {
    "$project": {
    "_id": "$a._id",
    "1": "$a.1",
    "2": "$3",
    "3": "$a.5",
    "4": "$a.6",
    "5": "$a.7",
    "6": "$4",
    }
    }
    ]
    return { collectionName, pipeline }
    }

    //all cryptos = 1525, all fiats = 32, unique alerts count = 42445 alerts count = 100000, time taken = 24.5 seconds
    //created indexes in the e1_alerts table for source, destination and source:destination combination, pipelines = bad
    function aggregateUniqueAlertsPipelineAll() {
    const collectionName = "e1_unique_alerts"
    const pipeline = [
    {
    "$lookup": {
    "from": "e1_sources",
    "let": { source: "$1" },
    "pipeline": [
    {
    "$match": {
    "$expr": {
    "$eq": ["$_id", "$$source"]
    }
    }
    },
    {
    "$project": {
    "1": 1, "_id": 0
    }
    }
    ],
    "as": "s"
    }
    },
    { "$unwind": "$s" },
    {
    "$project": {
    "_id": 0,
    "1": 1,
    "2": 1,
    "3": "$s.1"
    }
    },
    {
    "$lookup": {
    "from": "e1_destinations",
    "let": { destination: "$2" },
    "pipeline": [
    {
    "$match": {
    "$expr": {
    "$eq": ["$_id", "$$destination"]
    }
    }
    },
    {
    "$project": {
    "_id": 0
    }
    }
    ],
    "as": "d"
    }
    },
    { "$unwind": "$d" },
    {
    "$project":
    {
    "1": 1,
    "2": 1,
    "3": { "$multiply": ["$3", "$d.1"] }
    }
    },
    {
    "$lookup": {
    "from": "e1_alerts",
    "let": { source: "$1", destination: "$2", price: "$3" },
    "pipeline": [
    {
    "$match": {
    "$expr": {
    "$and": [
    {
    "$eq": ["$$source", "$2"]
    },
    {
    "$eq": ["$$destination", "$3"]
    },
    {
    "$or": [
    {
    "$and": [
    {
    "$eq": ["$5", false]
    },
    {
    "$gte": ["$4", "$$price"]
    }
    ]
    },
    {
    "$and": [
    {
    "$eq": ["$5", true]
    },
    {
    "$lte": ["$4", "$$price"]
    }
    ]
    }
    ]
    }
    ]
    }
    }
    }
    ],
    "as": "a"
    }
    },
    { "$unwind": "$a" },
    {
    "$project": {
    "_id": "$a._id",
    "1": "$a.1",
    "2": "$1",
    "3": "$2",
    "4": "$a.4",
    "5": "$a.5",
    "6": "$a.6",
    "7": "$3"
    }
    }
    ]
    return { collectionName, pipeline }
    }

    function aggregateUniqueAlertsNoLookups() {
    const collectionName = "e1_unique_alerts"
    const pipeline = [
    {
    "$lookup": {
    "from": "e1_alerts",
    "localField": "_id",
    "foreignField": "4",
    "as": "a"
    }
    },
    {
    "$unwind": "$a"
    },
    {
    "$project": {
    "_id": "$a._id",
    "1": "$a.1",
    "2": "$a.4",
    "3": "$a.5",
    "4": "$a.6",
    "5": "$a.7",
    "6": "$3"
    }
    },
    {
    "$match": {
    "$expr": {
    "$or": [
    {
    "$and": [
    { "$eq": ["$4", false] },
    { "$gte": ["$3", "$6"] }
    ]
    },
    {
    "$and": [
    { "$eq": ["$4", true] },
    { "$lte": ["$3", "$6"] }
    ]
    }
    ]
    }
    }
    }
    ]
    return { collectionName, pipeline }
    }

    function aggregateUniqueAlertsCondensedNoLookups() {
    const collectionName = "e1_unique_alerts_condensed"
    const pipeline = [
    {
    "$lookup": {
    "from": "e1_alerts_condensed",
    "localField": "_id",
    "foreignField": "2",
    "as": "a"
    }
    },
    {
    "$unwind": "$a"
    },
    {
    "$project": {
    "_id": "$a._id",
    "1": "$a.1",
    "2": "$a.2",
    "3": "$a.3",
    "4": "$a.4",
    "5": "$a.5",
    "6": "$1"
    }
    },
    {
    "$match": {
    "$expr": {
    "$or": [
    {
    "$and": [
    { "$eq": ["$4", false] },
    { "$gte": ["$3", "$6"] }
    ]
    },
    {
    "$and": [
    { "$eq": ["$4", true] },
    { "$lte": ["$3", "$6"] }
    ]
    }
    ]
    }
    }
    }
    ]
    return { collectionName, pipeline }
    }

    async function refreshDatabase(refresh = false) {
    if (refresh) {
    let fiats = await fse.readJson("fiats.json")
    fiats = utils.normalizeDestinations(fiats)

    const cryptos = await fse.readJson("cmc.json")
    const sourceOperations = utils.getSourceOperations(cryptos)
    utils.upsertNative("e1_sources", sourceOperations)

    const destinationOperations = utils.getDestinationOperations(fiats)
    utils.upsertNative("e1_destinations", destinationOperations)

    const { alerts, unique, condensedAlerts, condensedUnique, query } = utils.getAlerts(cryptos, fiats, 10000, 1, 1)
    // fse.writeJson("alerts.json", alerts)
    // fse.writeJson("unique.json", unique)
    // fse.writeJson("condensed_alerts.json", condensedAlerts)
    // fse.writeJson("condensed_unique.json", condensedUnique)
    fse.writeJson("query.json", query, { spaces: 4 })
    utils.deleteInsert("e1_alerts", alerts)
    utils.deleteInsert("e1_unique_alerts", unique)
    utils.deleteInsert("e1_alerts_condensed", condensedAlerts)
    utils.deleteInsert("e1_unique_alerts_condensed", condensedUnique)
    setTimeout(() => {
    utils.queryAlerts("e1_alerts_condensed", query)
    }, 30000)

    }
    }

    (function () {

    refreshDatabase(true)
    const run1 = false
    const run2 = false
    if (run1) {
    const { collectionName, pipeline } = aggregateUniqueAlertsNoLookups()
    utils.aggregate(collectionName, "aggregation.json", pipeline)
    }

    if (run2) {
    const { collectionName, pipeline } = aggregateUniqueAlertsCondensedNoLookups()
    utils.aggregate(collectionName, "aggregation_condensed.json", pipeline)
    }
    })()