/* * Author: Zaggen * github: https://github.com/Zaggen * Free to use and modify * */ const httpProxy = require('http-proxy') const http = require('http') const proxy = httpProxy.createProxyServer({}) const Promise = require('bluebird') // REDIS const redis = require('redis') const redisConfig = require('./config/local.js').redisConfig const redisClient = redis.createClient(redisConfig) const port = 1337 // Same as the one configured on nginx Promise.promisifyAll(redis.RedisClient.prototype) // Queue system vars/consts const MAX_ATTEMPS = 5 const REGISTRY_CHECK_TIME = 60000 const MAX_QUEUE_SIZE = 20 const queue = [] let serverIndex = 0 let serversList (async function init() { serversList = await redisClient.lrangeAsync('server-registry', 0, 100) await validateRegistryAsync(serversList) let lastRegistryCheck = (new Date()).getTime() const server = http.createServer(async function(req, res) { handleRequest(req, res, MAX_ATTEMPS) }) async function handleRequest(req, res, retryAttemptsLeft) { req.retryAttemptsLeft = retryAttemptsLeft serversList = await redisClient.lrangeAsync('server-registry', 0, 100) // If 60s has passed since the last request, we validate our registry if(((new Date()).getTime() - lastRegistryCheck) > REGISTRY_CHECK_TIME){ serversList = validateRegistryAsync(serversList) lastRegistryCheck = (new Date()).getTime() } // If we have some servers, we process the request if(serversList.length > 0){ serverIndex = (serverIndex + 1) % serversList.length const target = serversList[serverIndex] // console.log('balancing request to: ', target); proxy.web(req, res, {target}) } else { queueRequest(req, res) } } function queueRequest(req, res) { const {retryAttemptsLeft} = req if(retryAttemptsLeft > 0){ if(queue.length >= MAX_QUEUE_SIZE) { const oldestRequest = queue.shift() sendBadGateWay(oldestRequest.res) } queue.push({req, res, retryAttemptsLeft: retryAttemptsLeft - 1}) const delayMultiplier = (MAX_ATTEMPS + 1) - retryAttemptsLeft scheduleRetry(queue, delayMultiplier) } else { sendBadGateWay(res) } } function sendBadGateWay(res) { res.statusCode = 502 res.statusMessage = 'Bad Gateway' res.end(`
The server encountered a temporary error and could not complete your request. Please try again later and if the problem persists then contact support and explain in detail how and when the error occurred.
Thank you for your kind understanding.
`) } server.on('upgrade', async function(req, socket, head) { // console.log('socket connection') serversList = await redisClient.lrangeAsync('server-registry', 0, 100) if(serversList.length > 0){ serverIndex = (serverIndex + 1) % serversList.length const target = serversList[serverIndex] // console.log('balancing request to: ', target); proxy.ws(req, socket, head, {target}); } }) proxy.on('error', function (err, req, res) { // This will force a registry check on the next request lastRegistryCheck -= REGISTRY_CHECK_TIME if(err.code === 'ECONNREFUSED' && req.retryAttemptsLeft > 0){ queueRequest(req, res) } else sendBadGateWay(res) }) function scheduleRetry(queue, delayMultiplier) { setTimeout(function() { if(queue.length > 0){ const {req, res, retryAttemptsLeft} = queue.shift() if(!res.socket.destroyed){ // console.log('retryAttemptsLeft', retryAttemptsLeft) handleRequest(req, res, retryAttemptsLeft) } else { // CONNECTION RESET BY CLIENT res.end() } } }, 200 * delayMultiplier) } async function validateRegistryAsync(serversList) { const updatedServerList = [] for (let server of serversList) { console.log('iterating') const isAlive = await testEndPoint(server) if(!isAlive) redisClient.lremAsync('server-registry', 0, server) else updatedServerList.push(server) } return updatedServerList } async function testEndPoint(url) { return new Promise((resolve)=> { try { http.get(url, ()=> { resolve(true) }).on('error', (e) => { resolve(false) }); } catch(err){ resolve(false) } }) } server.listen(port, ()=> { console.log(`Load balancer listening on port ${port}`) }) })()