/* * Author: Zaggen - 2017 * version: 0.1 * github: https://github.com/Zaggen * Free to use and modify * */ const httpProxy = require('http-proxy') const http = require('http') const proxy = httpProxy.createProxyServer({}) const Promise = require('bluebird') // REDIS const redis = require('redis') const redisConfig = require('./config/local.js').redisConfig const redisClient = redis.createClient(redisConfig) const port = 1337 // Same as the one configured on nginx Promise.promisifyAll(redis.RedisClient.prototype) // Queue system vars/consts const MAX_ATTEMPS = 5 const REGISTRY_CHECK_TIME = 10000 const MAX_QUEUE_SIZE = 20 const queue = [] let serverIndex = 0 let serversList (async function init() { serversList = await redisClient.lrangeAsync('server-registry', 0, 100) await validateRegistryAsync(serversList) let lastRegistryCheck = (new Date()).getTime() const server = http.createServer(async function(req, res) { handleRequest(req, res, MAX_ATTEMPS) }) async function handleRequest(req, res, retryAttemptsLeft) { if(validationLock.isActive) await validationLock.waitForRelease() req.retryAttemptsLeft = retryAttemptsLeft // If 60s has passed since the last request, we validate our registry if(((new Date()).getTime() - lastRegistryCheck) > REGISTRY_CHECK_TIME){ validationLock.acquire() serversList = await validateRegistryAsync(await redisClient.lrangeAsync('server-registry', 0, 100)) lastRegistryCheck = (new Date()).getTime() validationLock.release() } // If we have some servers, we process the request if(serversList.length > 0){ serverIndex = (serverIndex + 1) % serversList.length const target = serversList[serverIndex] // console.log('balancing request to: ', target); proxy.web(req, res, {target}) } else { queueRequest(req, res) } } const validationLock = { _isActive: false, get isActive(){ return this._isActive}, set isActive(state){ this._isActive = state if(state === false) this._notifyRelease() }, acquire(){ this.isActive = true }, release(){ this.isActive = false }, async waitForRelease(){ return new Promise((resolve)=> { this._onRelease(resolve) }) }, _notifyRelease(){ let cb while(cb = this._listeners.shift()){ cb() } }, _listeners: [], _onRelease(cb){ this._listeners.push(cb) } } function queueRequest(req, res) { const {retryAttemptsLeft} = req // console.log(`queueRequest -> retryAttemptsLeft: ${retryAttemptsLeft}`) if(retryAttemptsLeft > 0){ if(queue.length >= MAX_QUEUE_SIZE) { const oldestRequest = queue.shift() sendBadGateWay(oldestRequest.res) } queue.push({req, res, retryAttemptsLeft: retryAttemptsLeft - 1}) const delayMultiplier = (MAX_ATTEMPS + 1) - retryAttemptsLeft scheduleRetry(queue, delayMultiplier) } else { sendBadGateWay(res) } } function sendBadGateWay(res) { res.statusCode = 502 res.statusMessage = 'Bad Gateway' res.end(`

502 Server Error


The server encountered a temporary error and could not complete your request. Please try again later and if the problem persists then contact support and explain in detail how and when the error occurred.

Thank you for your kind understanding.

`) } server.on('upgrade', async function(req, socket, head) { // console.log('socket connection') serversList = await redisClient.lrangeAsync('server-registry', 0, 100) if(serversList.length > 0){ serverIndex = (serverIndex + 1) % serversList.length const target = serversList[serverIndex] // console.log('balancing request to: ', target); proxy.ws(req, socket, head, {target}); } }) proxy.on('error', function (err, req, res) { // This will force a registry check on the next request lastRegistryCheck -= REGISTRY_CHECK_TIME console.error('Error') console.error(err) if(err.code === 'ECONNREFUSED' && req.retryAttemptsLeft === MAX_ATTEMPS){ handleRequest(req, res, MAX_ATTEMPS - 1) } else if(err.code === 'ECONNREFUSED' && req.retryAttemptsLeft > 0){ queueRequest(req, res) } else sendBadGateWay(res) }) function scheduleRetry(queue, delayMultiplier) { // console.log(`scheduleRetry`) setTimeout(function() { if(queue.length > 0){ const {req, res, retryAttemptsLeft} = queue.shift() if(!res.socket.destroyed){ // console.log('retryAttemptsLeft', retryAttemptsLeft) handleRequest(req, res, retryAttemptsLeft) } else { // CONNECTION RESET BY CLIENT res.end() } } }, 200 * delayMultiplier) } async function validateRegistryAsync(serversList) { const updatedServerList = [] for (let server of serversList) { const isAlive = await testEndPoint(server) if(!isAlive){ redisClient.lremAsync('server-registry', 0, server) // console.log("REMOVED SERVER FROM REGISTRY") } else updatedServerList.push(server) } return updatedServerList } async function testEndPoint(url) { return new Promise((resolve)=> { try { http.get(url, ()=> { resolve(true) }).on('error', (err) => { if(err.code === 'ECONNREFUSED') resolve(false) else resolve(true) }); } catch(err){ resolve(false) } }) } server.listen(port, ()=> { console.log(`Load balancer listening on port ${port}`) }) })()