Skip to content

Instantly share code, notes, and snippets.

@RobDoan
Forked from fabware/application.js
Created September 27, 2011 06:10
Show Gist options
  • Save RobDoan/1244447 to your computer and use it in GitHub Desktop.
Save RobDoan/1244447 to your computer and use it in GitHub Desktop.

Revisions

  1. @fabware fabware created this gist Jun 8, 2011.
    249 changes: 249 additions & 0 deletions application.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,249 @@
    var express = require('express');
    var redis = require('redis');

    const serverType = process.argv[2];
    const serverHost = process.argv[3];
    const serverPort = parseInt(process.argv[4]);

    const redisPort = 6379;
    const redisHost = '127.0.0.1';

    var rclient = new redis.createClient(redisPort, redisHost);

    function writeUserMessageHost(userId, host){
    rclient.set('MUPH:'+userId, host);
    }

    function deleteUserMessageHost(userId){
    rclient.delete('MUPH:'+userId);
    }

    function writeChannelMessageHost(channel, host){
    rclient.sadd('MCMH:'+channel, host);
    }

    function ListenerContainer(){
    this.listeners = {};
    this.listenersCount = 0;
    };

    ListenerContainer.prototype.addClient = function(client){
    this.listeners[client.listenerId] = client;
    this.listenersCount += 1;
    writeUserMessageHost(client.listenerId, serverHost+':'+serverPort);
    };

    ListenerContainer.prototype.removeClient = function(client){
    delete this.listeners[client.listenerId];
    this.listenersCount -= 1;
    deleteUserMessageHost(client.listenerId, serverHost+':'+serverPort);
    };

    ListenerContainer.prototype.getClients = function(listenerId){
    var client = this.listeners[listenerId];
    if (client != null){
    return {listenerId: client};
    }else{
    return {};
    };
    };

    ListenerContainer.prototype.count = function(listenerId){
    return this.listenersCount;
    1,17 Top
    ListenerContainer.prototype.clientsCount = function(){
    // find first listener
    var firstListener = null;
    for (listenerId in this.listeners){
    var firstListener = this.listeners[listenerId].listener;
    break;
    };
    var listenerCount = 0;
    if (firstListener){
    for(client in firstListener.clients){
    listenerCount += 1;
    };
    };
    return listenerCount;
    };

    function MultiListenerContainer() {
    this.listeners = {};
    this.listenersCount = {'__total': 0};
    };

    MultiListenerContainer.prototype.addClient = function(client){
    // console.log('add client to channel:', client.listenerId, client.sessionId);
    if (client.listenerId in this.listeners){
    if(!(client.sessionId in this.listeners[client.listenerId])){
    this.listeners[client.listenerId][client.sessionId] = client;
    this.listenersCount[client.listenerId] += 1;
    this.listenersCount['__total'] += 1;
    };

    }else{
    this.listeners[client.listenerId] = {};
    this.listeners[client.listenerId][client.sessionId] = client;
    this.listenersCount[client.listenerId] = 1;
    this.listenersCount['__total'] += 1;
    // console.log('init client', client.sessionId);
    };
    writeChannelMessageHost(client.listenerId, serverHost+':'+serverPort);
    };

    MultiListenerContainer.prototype.removeClient = function(client){
    if (client.listenerId in this.listeners){
    if (client.sessionId in this.listeners[client.listenerId]){
    delete this.listeners[client.listenerId][client.sessionId];
    this.listenersCount[client.listenerId] -= 1;
    this.listenersCount['__total'] -= 1;
    }
    }
    };
    MultiListenerContainer.prototype.getClients = function(listenerId){
    if (listenerId in this.listeners){
    var clients = this.listeners[listenerId];
    if (clients){
    return clients;
    }else{
    return {}
    };
    }

    };

    MultiListenerContainer.prototype.count = function(listenerId){
    if (listenerId){
    if (listenerId in this.listenersCount){
    return this.listenersCount[listenerId];
    }else{
    return 0;
    }
    }else{
    return this.listenersCount;
    };
    };

    MultiListenerContainer.prototype.clientsCount = function(){
    // find first listener
    var firstListener = null;
    for (listenerId in this.listeners){
    for (sessionId in this.listeners[listenerId]){
    var firstListener = this.listeners[listenerId][sessionId].listener;
    break;
    };
    };
    var listenerCount = 0;
    if (firstListener){
    for(client in firstListener.clients){
    listenerCount += 1;
    };
    };
    return listenerCount;
    };

    var userContainer = new ListenerContainer();
    var channelContainer = new MultiListenerContainer();

    var server = module.exports = express.createServer();
    server.get('/post/:msg_target/:msg_target_id/:msg', function(req, res){
    var sentCounter = 0;
    if (req.params.msg_target == 'channel'){
    var container = channelContainer;
    }else if(req.params.msg_target == 'user')
    {
    var container = userContainer;
    }
    var clients = container.getClients(req.params.msg_target_id);
    for(listenerId in clients){
    var client = clients[listenerId];
    var msgBuf = new Buffer(req.params.msg, 'base64');
    var msgStr = msgBuf.toString('utf-8');
    client.send(msgStr);
    sentCounter += 1;
    }
    res.send(''+sentCounter);
    });

    server.get('/stats/count', function(req, res){
    var result = {};
    result['user'] = userContainer.count();
    result['channel'] = channelContainer.count();
    result['user_clients'] = userContainer.clientsCount();
    result['channel_clients'] = channelContainer.clientsCount();
    res.send(JSON.stringify(result));
    });
    server.get('/debug/check/session/:session_id', function(req, res){
    var found = 'not found';
    for(userId in userContainer.listeners){
    var client = userContainer.listeners[userId];
    if (client.sessionId == req.params.session_id){
    found = 'found in user: '+userId;
    }
    };
    for(channel in channelContainer.listeners){
    for(sessionId in channelContainer.listeners[channel]){
    if(sessionId == req.params.session_id){
    found = 'found in channel: '+channel;
    }
    }
    }
    res.send(found);
    });

    function getRequestExtra(url){
    return url.split('/')[2];
    };


    const io = require('socket.io');

    function serveChannelMessage(){
    console.log('starting channel server...');
    const socket = io.listen(server, {'resource': 'channelmsg',
    'flashPolicyServer': false,
    'transports': ['flashsocket']});

    socket.on('connection', function(client){
    // console.dir(channelContainer);
    var extra = getRequestExtra(client.request.url);

    var extraParts = extra.split('_');
    client.listenerId = extraParts[0];
    var onlineToken = extraParts[1];
    channelContainer.addClient(client);

    client.on('message', function(msg){
    });

    client.on('disconnect', function(){
    channelContainer.removeClient(this);
    });
    });
    };
    function serveUserMessage(){
    const socket = io.listen(server, {'resource': 'usermsg',
    'flashPolicyServer': false,
    'transports': ['flashsocket']});

    socket.on('connection', function(client){
    client.listenerId = getRequestExtra(client.request.url);
    userContainer.addClient(client);

    client.on('message', function(msg){
    });

    client.on('disconnect', function(){
    userContainer.removeClient(this);
    });
    });

    };

    if(serverType == 'user'){
    serveUserMessage();
    }else if(serverType == 'channel'){
    serveChannelMessage();
    }

    server.listen(serverPort, serverHost);
    73 changes: 73 additions & 0 deletions haproxy.conf
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,73 @@
    global
    # daemon
    maxconn 200000
    pidfile /var/run/haproxy.pid
    user nobody
    group nobody
    # debug

    defaults
    mode http
    # option http-server-close
    # option http-pretend-keepalive
    # option httplog
    stats uri /haproxy_stats

    frontend flashpolicy 0.0.0.0:843
    mode tcp
    maxconn 20000
    timeout client 86400000
    default_backend flashpolicy_servers

    backend flashpolicy_servers
    mode tcp
    balance roundrobin
    timeout server 86400000
    timeout connect 86400000
    server fp1 10.0.0.55:10000 weight 1 maxconn 4000 check
    server fp2 10.0.0.55:10001 weight 1 maxconn 4000 check
    server fp3 10.0.0.55:10002 weight 1 maxconn 4000 check
    server fp4 10.0.0.55:10003 weight 1 maxconn 4000 check
    server fp5 10.0.0.55:10004 weight 1 maxconn 4000 check

    frontend all 0.0.0.0:30000
    maxconn 200000
    timeout client 86400000
    acl is_usermsg_stream path_dir usermsg
    use_backend usermsg_stream_servers if is_usermsg_stream
    acl is_channelmsg_stream path_dir channelmsg
    use_backend channelmsg_stream_servers if is_channelmsg_stream

    backend usermsg_stream_servers
    balance roundrobin
    option forwardfor
    timeout queue 5000
    timeout server 86400000
    timeout connect 86400000
    server stream20 10.0.0.55:40001 weight 1 maxconn 10000 check
    server stream21 10.0.0.55:40002 weight 1 maxconn 10000 check
    #server stream22 10.0.0.55:40003 weight 1 maxconn 10000 check
    #server stream23 10.0.0.55:40004 weight 1 maxconn 10000 check
    #server stream24 10.0.0.55:40005 weight 1 maxconn 10000 check
    #server stream25 10.0.0.55:40006 weight 1 maxconn 10000 check
    #server stream26 10.0.0.55:40007 weight 1 maxconn 10000 check
    #server stream27 10.0.0.55:40008 weight 1 maxconn 10000 check
    #server stream28 10.0.0.55:40009 weight 1 maxconn 10000 check
    #server stream29 10.0.0.55:40010 weight 1 maxconn 10000 check

    backend channelmsg_stream_servers
    balance roundrobin
    option forwardfor
    timeout queue 5000
    timeout server 86400000
    timeout connect 86400000
    server stream30 10.0.0.55:50001 weight 1 maxconn 20000 check
    server stream31 10.0.0.55:50002 weight 1 maxconn 20000 check
    #server stream32 10.0.0.55:50003 weight 1 maxconn 10000 check
    #server stream33 10.0.0.55:50004 weight 1 maxconn 10000 check
    #server stream34 10.0.0.55:50005 weight 1 maxconn 10000 check
    #server stream35 10.0.0.55:50006 weight 1 maxconn 10000 check
    #server stream36 10.0.0.55:50007 weight 1 maxconn 10000 check
    #server stream37 10.0.0.55:50008 weight 1 maxconn 10000 check
    #server stream38 10.0.0.55:50009 weight 1 maxconn 10000 check
    #server stream39 10.0.0.55:50010 weight 1 maxconn 10000 check