Skip to content

Instantly share code, notes, and snippets.

@Zaggen
Last active May 26, 2022 16:44
Show Gist options
  • Save Zaggen/bbec82153f45ec60a136dca8e9ed65e7 to your computer and use it in GitHub Desktop.
Save Zaggen/bbec82153f45ec60a136dca8e9ed65e7 to your computer and use it in GitHub Desktop.

Revisions

  1. Zaggen revised this gist Mar 16, 2017. 1 changed file with 0 additions and 6 deletions.
    6 changes: 0 additions & 6 deletions load-balancer.js
    Original file line number Diff line number Diff line change
    @@ -1,9 +1,3 @@
    /*
    * Author: Zaggen
    * github: https://github.com/Zaggen
    * Free to use and modify
    * */

    /*
    * Author: Zaggen - 2017
    * version: 0.1
  2. Zaggen revised this gist Mar 16, 2017. 1 changed file with 62 additions and 8 deletions.
    70 changes: 62 additions & 8 deletions load-balancer.js
    Original file line number Diff line number Diff line change
    @@ -4,6 +4,13 @@
    * Free to use and modify
    * */

    /*
    * Author: Zaggen - 2017
    * version: 0.1
    * github: https://github.com/Zaggen
    * Free to use and modify
    * */

    const httpProxy = require('http-proxy')
    const http = require('http')
    const proxy = httpProxy.createProxyServer({})
    @@ -16,7 +23,7 @@ const port = 1337 // Same as the one configured on nginx
    Promise.promisifyAll(redis.RedisClient.prototype)
    // Queue system vars/consts
    const MAX_ATTEMPS = 5
    const REGISTRY_CHECK_TIME = 60000
    const REGISTRY_CHECK_TIME = 10000
    const MAX_QUEUE_SIZE = 20
    const queue = []

    @@ -33,12 +40,16 @@ let serversList
    })

    async function handleRequest(req, res, retryAttemptsLeft) {
    if(validationLock.isActive)
    await validationLock.waitForRelease()

    req.retryAttemptsLeft = retryAttemptsLeft
    serversList = await redisClient.lrangeAsync('server-registry', 0, 100)
    // If 60s has passed since the last request, we validate our registry
    if(((new Date()).getTime() - lastRegistryCheck) > REGISTRY_CHECK_TIME){
    serversList = validateRegistryAsync(serversList)
    validationLock.acquire()
    serversList = await validateRegistryAsync(await redisClient.lrangeAsync('server-registry', 0, 100))
    lastRegistryCheck = (new Date()).getTime()
    validationLock.release()
    }
    // If we have some servers, we process the request
    if(serversList.length > 0){
    @@ -52,8 +63,40 @@ let serversList
    }
    }

    const validationLock = {
    _isActive: false,
    get isActive(){ return this._isActive},
    set isActive(state){
    this._isActive = state
    if(state === false)
    this._notifyRelease()
    },
    acquire(){
    this.isActive = true
    },
    release(){
    this.isActive = false
    },
    async waitForRelease(){
    return new Promise((resolve)=> {
    this._onRelease(resolve)
    })
    },
    _notifyRelease(){
    let cb
    while(cb = this._listeners.shift()){
    cb()
    }
    },
    _listeners: [],
    _onRelease(cb){
    this._listeners.push(cb)
    }
    }

    function queueRequest(req, res) {
    const {retryAttemptsLeft} = req
    // console.log(`queueRequest -> retryAttemptsLeft: ${retryAttemptsLeft}`)
    if(retryAttemptsLeft > 0){
    if(queue.length >= MAX_QUEUE_SIZE) {
    const oldestRequest = queue.shift()
    @@ -98,14 +141,20 @@ let serversList
    proxy.on('error', function (err, req, res) {
    // This will force a registry check on the next request
    lastRegistryCheck -= REGISTRY_CHECK_TIME
    if(err.code === 'ECONNREFUSED' && req.retryAttemptsLeft > 0){
    console.error('Error')
    console.error(err)
    if(err.code === 'ECONNREFUSED' && req.retryAttemptsLeft === MAX_ATTEMPS){
    handleRequest(req, res, MAX_ATTEMPS - 1)
    }
    else if(err.code === 'ECONNREFUSED' && req.retryAttemptsLeft > 0){
    queueRequest(req, res)
    }
    else
    sendBadGateWay(res)
    })

    function scheduleRetry(queue, delayMultiplier) {
    // console.log(`scheduleRetry`)
    setTimeout(function() {
    if(queue.length > 0){
    const {req, res, retryAttemptsLeft} = queue.shift()
    @@ -124,10 +173,11 @@ let serversList
    async function validateRegistryAsync(serversList) {
    const updatedServerList = []
    for (let server of serversList) {
    console.log('iterating')
    const isAlive = await testEndPoint(server)
    if(!isAlive)
    if(!isAlive){
    redisClient.lremAsync('server-registry', 0, server)
    // console.log("REMOVED SERVER FROM REGISTRY")
    }
    else
    updatedServerList.push(server)
    }
    @@ -139,8 +189,11 @@ let serversList
    try {
    http.get(url, ()=> {
    resolve(true)
    }).on('error', (e) => {
    resolve(false)
    }).on('error', (err) => {
    if(err.code === 'ECONNREFUSED')
    resolve(false)
    else
    resolve(true)
    });
    } catch(err){
    resolve(false)
    @@ -152,3 +205,4 @@ let serversList
    console.log(`Load balancer listening on port ${port}`)
    })
    })()

  3. Zaggen created this gist Mar 8, 2017.
    154 changes: 154 additions & 0 deletions load-balancer.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,154 @@
    /*
    * Author: Zaggen
    * github: https://github.com/Zaggen
    * Free to use and modify
    * */

    const httpProxy = require('http-proxy')
    const http = require('http')
    const proxy = httpProxy.createProxyServer({})
    const Promise = require('bluebird')
    // REDIS
    const redis = require('redis')
    const redisConfig = require('./config/local.js').redisConfig
    const redisClient = redis.createClient(redisConfig)
    const port = 1337 // Same as the one configured on nginx
    Promise.promisifyAll(redis.RedisClient.prototype)
    // Queue system vars/consts
    const MAX_ATTEMPS = 5
    const REGISTRY_CHECK_TIME = 60000
    const MAX_QUEUE_SIZE = 20
    const queue = []

    let serverIndex = 0
    let serversList

    (async function init() {
    serversList = await redisClient.lrangeAsync('server-registry', 0, 100)
    await validateRegistryAsync(serversList)
    let lastRegistryCheck = (new Date()).getTime()

    const server = http.createServer(async function(req, res) {
    handleRequest(req, res, MAX_ATTEMPS)
    })

    async function handleRequest(req, res, retryAttemptsLeft) {
    req.retryAttemptsLeft = retryAttemptsLeft
    serversList = await redisClient.lrangeAsync('server-registry', 0, 100)
    // If 60s has passed since the last request, we validate our registry
    if(((new Date()).getTime() - lastRegistryCheck) > REGISTRY_CHECK_TIME){
    serversList = validateRegistryAsync(serversList)
    lastRegistryCheck = (new Date()).getTime()
    }
    // If we have some servers, we process the request
    if(serversList.length > 0){
    serverIndex = (serverIndex + 1) % serversList.length
    const target = serversList[serverIndex]
    // console.log('balancing request to: ', target);
    proxy.web(req, res, {target})
    }
    else {
    queueRequest(req, res)
    }
    }

    function queueRequest(req, res) {
    const {retryAttemptsLeft} = req
    if(retryAttemptsLeft > 0){
    if(queue.length >= MAX_QUEUE_SIZE) {
    const oldestRequest = queue.shift()
    sendBadGateWay(oldestRequest.res)
    }
    queue.push({req, res, retryAttemptsLeft: retryAttemptsLeft - 1})
    const delayMultiplier = (MAX_ATTEMPS + 1) - retryAttemptsLeft
    scheduleRetry(queue, delayMultiplier)
    }
    else {
    sendBadGateWay(res)
    }
    }

    function sendBadGateWay(res) {
    res.statusCode = 502
    res.statusMessage = 'Bad Gateway'
    res.end(`
    <body style="text-align: center; padding: 10px;">
    <h1>502 Server Error</h1>
    <hr/>
    <p>
    The server encountered a temporary error and could not complete your request.
    Please try again later and if the problem persists then contact support and explain in detail how and when
    the error occurred.
    </p>
    <p>Thank you for your kind understanding.</p>
    </body>
    `)
    }

    server.on('upgrade', async function(req, socket, head) {
    // console.log('socket connection')
    serversList = await redisClient.lrangeAsync('server-registry', 0, 100)
    if(serversList.length > 0){
    serverIndex = (serverIndex + 1) % serversList.length
    const target = serversList[serverIndex]
    // console.log('balancing request to: ', target);
    proxy.ws(req, socket, head, {target});
    }
    })
    proxy.on('error', function (err, req, res) {
    // This will force a registry check on the next request
    lastRegistryCheck -= REGISTRY_CHECK_TIME
    if(err.code === 'ECONNREFUSED' && req.retryAttemptsLeft > 0){
    queueRequest(req, res)
    }
    else
    sendBadGateWay(res)
    })

    function scheduleRetry(queue, delayMultiplier) {
    setTimeout(function() {
    if(queue.length > 0){
    const {req, res, retryAttemptsLeft} = queue.shift()
    if(!res.socket.destroyed){
    // console.log('retryAttemptsLeft', retryAttemptsLeft)
    handleRequest(req, res, retryAttemptsLeft)
    }
    else {
    // CONNECTION RESET BY CLIENT
    res.end()
    }
    }
    }, 200 * delayMultiplier)
    }

    async function validateRegistryAsync(serversList) {
    const updatedServerList = []
    for (let server of serversList) {
    console.log('iterating')
    const isAlive = await testEndPoint(server)
    if(!isAlive)
    redisClient.lremAsync('server-registry', 0, server)
    else
    updatedServerList.push(server)
    }
    return updatedServerList
    }

    async function testEndPoint(url) {
    return new Promise((resolve)=> {
    try {
    http.get(url, ()=> {
    resolve(true)
    }).on('error', (e) => {
    resolve(false)
    });
    } catch(err){
    resolve(false)
    }
    })
    }

    server.listen(port, ()=> {
    console.log(`Load balancer listening on port ${port}`)
    })
    })()