Skip to content

Instantly share code, notes, and snippets.

@Zaggen
Last active May 26, 2022 16:44
Show Gist options
  • Select an option

  • Save Zaggen/bbec82153f45ec60a136dca8e9ed65e7 to your computer and use it in GitHub Desktop.

Select an option

Save Zaggen/bbec82153f45ec60a136dca8e9ed65e7 to your computer and use it in GitHub Desktop.
Node.js load balancer with
/*
* Author: Zaggen
* github: https://github.com/Zaggen
* Free to use and modify
* */
const httpProxy = require('http-proxy')
const http = require('http')
const proxy = httpProxy.createProxyServer({})
const Promise = require('bluebird')
// REDIS
const redis = require('redis')
const redisConfig = require('./config/local.js').redisConfig
const redisClient = redis.createClient(redisConfig)
const port = 1337 // Same as the one configured on nginx
Promise.promisifyAll(redis.RedisClient.prototype)
// Queue system vars/consts
const MAX_ATTEMPS = 5
const REGISTRY_CHECK_TIME = 60000
const MAX_QUEUE_SIZE = 20
const queue = []
let serverIndex = 0
let serversList
(async function init() {
serversList = await redisClient.lrangeAsync('server-registry', 0, 100)
await validateRegistryAsync(serversList)
let lastRegistryCheck = (new Date()).getTime()
const server = http.createServer(async function(req, res) {
handleRequest(req, res, MAX_ATTEMPS)
})
async function handleRequest(req, res, retryAttemptsLeft) {
req.retryAttemptsLeft = retryAttemptsLeft
serversList = await redisClient.lrangeAsync('server-registry', 0, 100)
// If 60s has passed since the last request, we validate our registry
if(((new Date()).getTime() - lastRegistryCheck) > REGISTRY_CHECK_TIME){
serversList = validateRegistryAsync(serversList)
lastRegistryCheck = (new Date()).getTime()
}
// If we have some servers, we process the request
if(serversList.length > 0){
serverIndex = (serverIndex + 1) % serversList.length
const target = serversList[serverIndex]
// console.log('balancing request to: ', target);
proxy.web(req, res, {target})
}
else {
queueRequest(req, res)
}
}
function queueRequest(req, res) {
const {retryAttemptsLeft} = req
if(retryAttemptsLeft > 0){
if(queue.length >= MAX_QUEUE_SIZE) {
const oldestRequest = queue.shift()
sendBadGateWay(oldestRequest.res)
}
queue.push({req, res, retryAttemptsLeft: retryAttemptsLeft - 1})
const delayMultiplier = (MAX_ATTEMPS + 1) - retryAttemptsLeft
scheduleRetry(queue, delayMultiplier)
}
else {
sendBadGateWay(res)
}
}
function sendBadGateWay(res) {
res.statusCode = 502
res.statusMessage = 'Bad Gateway'
res.end(`
<body style="text-align: center; padding: 10px;">
<h1>502 Server Error</h1>
<hr/>
<p>
The server encountered a temporary error and could not complete your request.
Please try again later and if the problem persists then contact support and explain in detail how and when
the error occurred.
</p>
<p>Thank you for your kind understanding.</p>
</body>
`)
}
server.on('upgrade', async function(req, socket, head) {
// console.log('socket connection')
serversList = await redisClient.lrangeAsync('server-registry', 0, 100)
if(serversList.length > 0){
serverIndex = (serverIndex + 1) % serversList.length
const target = serversList[serverIndex]
// console.log('balancing request to: ', target);
proxy.ws(req, socket, head, {target});
}
})
proxy.on('error', function (err, req, res) {
// This will force a registry check on the next request
lastRegistryCheck -= REGISTRY_CHECK_TIME
if(err.code === 'ECONNREFUSED' && req.retryAttemptsLeft > 0){
queueRequest(req, res)
}
else
sendBadGateWay(res)
})
function scheduleRetry(queue, delayMultiplier) {
setTimeout(function() {
if(queue.length > 0){
const {req, res, retryAttemptsLeft} = queue.shift()
if(!res.socket.destroyed){
// console.log('retryAttemptsLeft', retryAttemptsLeft)
handleRequest(req, res, retryAttemptsLeft)
}
else {
// CONNECTION RESET BY CLIENT
res.end()
}
}
}, 200 * delayMultiplier)
}
async function validateRegistryAsync(serversList) {
const updatedServerList = []
for (let server of serversList) {
console.log('iterating')
const isAlive = await testEndPoint(server)
if(!isAlive)
redisClient.lremAsync('server-registry', 0, server)
else
updatedServerList.push(server)
}
return updatedServerList
}
async function testEndPoint(url) {
return new Promise((resolve)=> {
try {
http.get(url, ()=> {
resolve(true)
}).on('error', (e) => {
resolve(false)
});
} catch(err){
resolve(false)
}
})
}
server.listen(port, ()=> {
console.log(`Load balancer listening on port ${port}`)
})
})()
@Zaggen
Copy link
Author

Zaggen commented Mar 8, 2017

  • On Server start call redisClient.rpush.apply(redisClient, ['server-registry', serverUrl]) (Make sure is not already there or use a set instead)
  • On server shutdown call redisClient.lrem('server-registry', 0, serverUrl)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment