Skip to content

Instantly share code, notes, and snippets.

@jedisct1
Created March 22, 2011 08:35
Show Gist options
  • Save jedisct1/880931 to your computer and use it in GitHub Desktop.
Save jedisct1/880931 to your computer and use it in GitHub Desktop.

Revisions

  1. jedisct1 revised this gist May 17, 2011. No changes.
  2. jedisct1 revised this gist May 17, 2011. 1 changed file with 266 additions and 0 deletions.
    266 changes: 266 additions & 0 deletions Recommendation engine using MapReduce, Kaffeine, NodeJS and Redis.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,266 @@
    sys = require "sys";
    http = require "http";
    url = require "url";
    connect = require "connect";
    redis = require "redis";
    hashRing = require "hash_ring";

    Sharding = {
    servers = { "127.0.0.1 10000 6378": 1 };
    ring = new hashRing.HashRing servers;

    function getComponentsForEntry(entry) {
    entry_ = entry.split " ";
    return {
    host: entry_[0],
    http_port: parseInt entry_[1],
    redis_port: parseInt entry_[2]
    };
    }

    function getNodeForKey(key) {
    getComponentsForEntry ring.getNode key;
    }

    function getRedisNodeForKey(key) {
    entry = getNodeForKey key;
    return {
    host: entry.host,
    port: entry.redis_port
    };
    }

    function getHTTPNodeForKey(key) {
    entry = getNodeForKey key;
    return {
    host: entry.host,
    port: entry.http_port
    };
    }

    function getLocalRedisNode(keys) {
    typeof keys !== "Array" && (keys = [keys]);
    if keys.length <= 0, throw "keys.length <= 0";
    getRedisNodeForKey keys[0];
    }

    function splitToShards(items) {
    shards = {};
    items.forEach((item) {
    entry = ring.getNode item;
    shards[entry] ||= [];
    shards[entry].push(item);
    });
    shards_ = [];
    for entry in shards {
    shards_.push shards[entry];
    delete shards[entry];
    }
    shards_;
    }

    return {
    "getRedisNodeForKey": getRedisNodeForKey,
    "getHTTPNodeForKey": getHTTPNodeForKey,
    "getLocalRedisNode": getLocalRedisNode,
    "splitToShards": splitToShards
    };
    }();

    RedisPool = {
    pool = { };

    function _returnNode(node) {
    pool_key = "#{node.host}:#{node.port}";
    if pool[pool_key], return pool[pool_key];
    redisClient = redis.createClient node.port, node.host;
    pool[pool_key] = redisClient;
    }

    function getRedisClientForKey(key) {
    node = Sharding.getRedisNodeForKey key;
    _returnNode node;
    }

    function getLocalRedisClient(keys) {
    node = Sharding.getLocalRedisNode keys;
    _returnNode node;
    }

    return {
    "getRedisClientForKey": getRedisClientForKey,
    "getLocalRedisClient": getLocalRedisClient
    };
    }();

    function Recommendation(userId, relatedUsersIds) {
    iRelatedAccumulator = { }, cutoff = 1, limit = 50;

    function reduce(iRelatedAccumulator, cb) {
    contenders = [];
    for iRelatedUserId in iRelatedAccumulator {
    count = iRelatedAccumulator[iRelatedUserId];
    if count <= cutoff, continue;
    contenders.push [iRelatedUserId, count];
    }
    iRelatedAccumulator = null;
    contenders.sort (a, b) { b[1] - a[1] };
    contenders = contenders.slice(0, limit - 1).map((a) { a[0] });
    cb contenders;
    }

    function countIRelatedUsersIds(sres, cb) {
    sres.forEach((row) {
    row.forEach((iRelatedUserId) {
    if relatedUsersIds.indexOf(iRelatedUserId) === -1 {
    iRelatedAccumulator[iRelatedUserId] ||= 0;
    iRelatedAccumulator[iRelatedUserId]++;
    }
    });
    });
    delete iRelatedAccumulator[userId];
    cb iRelatedAccumulator;
    }

    function getIRelatedUsersIds(cb) {
    if relatedUsersIds.length <= 0 {
    countIRelatedUsersIds [], cb;
    return;
    }
    redisClient = RedisPool.getLocalRedisClient relatedUsersIds;
    multi = redisClient.multi();
    relatedUsersIds.forEach((relatedUserId) {
    multi.sort relatedUserId;
    });
    multi.exec((err, sres) {
    err ? console.err(err) : countIRelatedUsersIds(sres, cb);
    });
    }

    return {
    "getIRelatedUsersIds": getIRelatedUsersIds,
    "reduce": reduce
    };
    }

    function MapClient(userId, relatedUsersIds, cb) {
    data = "";
    node = Sharding.getHTTPNodeForKey userId;
    req = http.request({
    host: node.host,
    port: node.port,
    path: "/related/map/#{encodeURIComponent userId}.json",
    method: "GET"
    }, (cres) {
    cres.on "data", (chunk) { data += chunk };
    cres.on "end", { cb data };
    });
    j = JSON.stringify relatedUsersIds;
    req.setHeader "Content-Length", j.length;
    req.write j;
    req.end();
    }

    function Aggregation(userId) {
    function handleAggRelated_(userId, relatedUsersIds, cb) {
    requestCount = 0, resultCount = 0;
    iAggRelatedAccumulator = { };

    function onAggAllResults() {
    recommendation = new Recommendation userId, relatedUsersIds;
    recommendation.reduce iAggRelatedAccumulator, cb;
    }

    function onAggOneResult(data) {
    var iRelatedAccumulator;
    try {
    iRelatedAccumulator = JSON.parse data;
    } catch e {
    console.log "#{e}: [#{data}]";
    return;
    }
    for iRelatedUserId in iRelatedAccumulator {
    iAggRelatedAccumulator[iRelatedUserId] ||= 0;
    iAggRelatedAccumulator[iRelatedUserId] +=
    iRelatedAccumulator[iRelatedUserId];
    }
    resultCount++;
    resultCount >= requestCount && onAggAllResults();
    }

    relatedUsersIds.length || onAggOneResult JSON.stringify [ ];

    Sharding.splitToShards(relatedUsersIds).forEach((shard) {
    requestCount++;
    new MapClient userId, shard, onAggOneResult;
    });
    }

    function getRelatedUsersIdsTo(userId, cb) {
    redisClient = RedisPool.getRedisClientForKey userId;
    redisClient.sort userId, cb;
    }

    function handleAggRelated(cb) {
    getRelatedUsersIdsTo(userId, (err, relatedUsersIds) {
    handleAggRelated_ userId, relatedUsersIds, cb;
    });
    }

    return {
    "handleAggRelated": handleAggRelated
    };
    }

    function ClientContext(req, res) {
    return {
    req: req,
    res: res
    };
    }

    function Server(port) {
    function handleRelated(clientContext, userId, relatedUsersIds) {
    recommendation = new Recommendation userId, relatedUsersIds;
    iRelatedAccumulator = recommendation.getIRelatedUsersIds!();
    clientContext.res.writeHead 200, {"Content-Type": "json"};
    clientContext.res.end(JSON.stringify iRelatedAccumulator);
    }

    function routeForRelated(req, res) {
    clientContext = new ClientContext req, res;
    data = "";
    req.addListener "data", (chunk) { data += chunk };
    req.addListener! "end";
    var relatedUsersIds;
    try {
    relatedUsersIds = JSON.parse data;
    } catch e {
    console.log data;
    clientContext.res.writeHead 400, {"Content-Type": "text/plain"};
    clientContext.res.end "Bogus relatedsUsersIds: [#{data}]";
    return;
    }
    handleRelated clientContext, clientContext.req.params.userId,
    relatedUsersIds;
    }

    function routeForAggRelated(req, res) {
    clientContext = new ClientContext req, res;
    aggregation = new Aggregation clientContext.req.params.userId;
    contenders = aggregation.handleAggRelated!();
    clientContext.res.writeHead 200, {"Content-Type": "json"};
    clientContext.res.end(JSON.stringify contenders);
    }

    function routesForRoot(app) {
    app.get "/related/map/:userId.:format?", routeForRelated;
    app.get "/related/:userId.:format?", routeForAggRelated;
    }

    server = connect.createServer();
    server.use "/", connect.router routesForRoot;
    server.listen port;
    }

    new Server 10000;
  3. jedisct1 created this gist Mar 22, 2011.
    266 changes: 266 additions & 0 deletions Recommendation engine using Map/Reduce, Kaffeine, NodeJS and Redis
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,266 @@
    sys = require "sys";
    http = require "http";
    url = require "url";
    connect = require "connect";
    redis = require "redis";
    hashRing = require "hash_ring";

    Sharding = {
    servers = { "127.0.0.1 10000 6378": 1 };
    ring = new hashRing.HashRing servers;

    function getComponentsForEntry(entry) {
    entry_ = entry.split " ";
    return {
    host: entry_[0],
    http_port: parseInt entry_[1],
    redis_port: parseInt entry_[2]
    };
    }

    function getNodeForKey(key) {
    getComponentsForEntry ring.getNode key;
    }

    function getRedisNodeForKey(key) {
    entry = getNodeForKey key;
    return {
    host: entry.host,
    port: entry.redis_port
    };
    }

    function getHTTPNodeForKey(key) {
    entry = getNodeForKey key;
    return {
    host: entry.host,
    port: entry.http_port
    };
    }

    function getLocalRedisNode(keys) {
    typeof keys !== "Array" && (keys = [keys]);
    if keys.length <= 0, throw "keys.length <= 0";
    getRedisNodeForKey keys[0];
    }

    function splitToShards(items) {
    shards = {};
    items.forEach((item) {
    entry = ring.getNode item;
    shards[entry] ||= [];
    shards[entry].push(item);
    });
    shards_ = [];
    for entry in shards {
    shards_.push shards[entry];
    delete shards[entry];
    }
    shards_;
    }

    return {
    "getRedisNodeForKey": getRedisNodeForKey,
    "getHTTPNodeForKey": getHTTPNodeForKey,
    "getLocalRedisNode": getLocalRedisNode,
    "splitToShards": splitToShards
    };
    }();

    RedisPool = {
    pool = { };

    function _returnNode(node) {
    pool_key = "#{node.host}:#{node.port}";
    if pool[pool_key], return pool[pool_key];
    redisClient = redis.createClient node.port, node.host;
    pool[pool_key] = redisClient;
    }

    function getRedisClientForKey(key) {
    node = Sharding.getRedisNodeForKey key;
    _returnNode node;
    }

    function getLocalRedisClient(keys) {
    node = Sharding.getLocalRedisNode keys;
    _returnNode node;
    }

    return {
    "getRedisClientForKey": getRedisClientForKey,
    "getLocalRedisClient": getLocalRedisClient
    };
    }();

    function Recommendation(userId, relatedUsersIds) {
    iRelatedAccumulator = { }, cutoff = 1, limit = 50;

    function reduce(iRelatedAccumulator, cb) {
    contenders = [];
    for iRelatedUserId in iRelatedAccumulator {
    count = iRelatedAccumulator[iRelatedUserId];
    if count <= cutoff, continue;
    contenders.push [iRelatedUserId, count];
    }
    iRelatedAccumulator = null;
    contenders.sort (a, b) { b[1] - a[1] };
    contenders = contenders.slice(0, limit - 1).map((a) { a[0] });
    cb contenders;
    }

    function countIRelatedUsersIds(sres, cb) {
    sres.forEach((row) {
    row.forEach((iRelatedUserId) {
    if relatedUsersIds.indexOf(iRelatedUserId) === -1 {
    iRelatedAccumulator[iRelatedUserId] ||= 0;
    iRelatedAccumulator[iRelatedUserId]++;
    }
    });
    });
    delete iRelatedAccumulator[userId];
    cb iRelatedAccumulator;
    }

    function getIRelatedUsersIds(cb) {
    if relatedUsersIds.length <= 0 {
    countIRelatedUsersIds [], cb;
    return;
    }
    redisClient = RedisPool.getLocalRedisClient relatedUsersIds;
    multi = redisClient.multi();
    relatedUsersIds.forEach((relatedUserId) {
    multi.sort relatedUserId;
    });
    multi.exec((err, sres) {
    err ? console.err(err) : countIRelatedUsersIds(sres, cb);
    });
    }

    return {
    "getIRelatedUsersIds": getIRelatedUsersIds,
    "reduce": reduce
    };
    }

    function MapClient(userId, relatedUsersIds, cb) {
    data = "";
    node = Sharding.getHTTPNodeForKey userId;
    req = http.request({
    host: node.host,
    port: node.port,
    path: "/related/map/#{encodeURIComponent userId}.json",
    method: "GET"
    }, (cres) {
    cres.on "data", (chunk) { data += chunk };
    cres.on "end", { cb data };
    });
    j = JSON.stringify relatedUsersIds;
    req.setHeader "Content-Length", j.length;
    req.write j;
    req.end();
    }

    function Aggregation(userId) {
    function handleAggRelated_(userId, relatedUsersIds, cb) {
    requestCount = 0, resultCount = 0;
    iAggRelatedAccumulator = { };

    function onAggAllResults() {
    recommendation = new Recommendation userId, relatedUsersIds;
    recommendation.reduce iAggRelatedAccumulator, cb;
    }

    function onAggOneResult(data) {
    var iRelatedAccumulator;
    try {
    iRelatedAccumulator = JSON.parse data;
    } catch e {
    console.log "#{e}: [#{data}]";
    return;
    }
    for iRelatedUserId in iRelatedAccumulator {
    iAggRelatedAccumulator[iRelatedUserId] ||= 0;
    iAggRelatedAccumulator[iRelatedUserId] +=
    iRelatedAccumulator[iRelatedUserId];
    }
    resultCount++;
    resultCount >= requestCount && onAggAllResults();
    }

    relatedUsersIds.length || onAggOneResult JSON.stringify [ ];

    Sharding.splitToShards(relatedUsersIds).forEach((shard) {
    requestCount++;
    new MapClient userId, shard, onAggOneResult;
    });
    }

    function getRelatedUsersIdsTo(userId, cb) {
    redisClient = RedisPool.getRedisClientForKey userId;
    redisClient.sort userId, cb;
    }

    function handleAggRelated(cb) {
    getRelatedUsersIdsTo(userId, (err, relatedUsersIds) {
    handleAggRelated_ userId, relatedUsersIds, cb;
    });
    }

    return {
    "handleAggRelated": handleAggRelated
    };
    }

    function ClientContext(req, res) {
    return {
    req: req,
    res: res
    };
    }

    function Server(port) {
    function handleRelated(clientContext, userId, relatedUsersIds) {
    recommendation = new Recommendation userId, relatedUsersIds;
    iRelatedAccumulator = recommendation.getIRelatedUsersIds!();
    clientContext.res.writeHead 200, {"Content-Type": "json"};
    clientContext.res.end(JSON.stringify iRelatedAccumulator);
    }

    function routeForRelated(req, res) {
    clientContext = new ClientContext req, res;
    data = "";
    req.addListener "data", (chunk) { data += chunk };
    req.addListener! "end";
    var relatedUsersIds;
    try {
    relatedUsersIds = JSON.parse data;
    } catch e {
    console.log data;
    clientContext.res.writeHead 400, {"Content-Type": "text/plain"};
    clientContext.res.end "Bogus relatedsUsersIds: [#{data}]";
    return;
    }
    handleRelated clientContext, clientContext.req.params.userId,
    relatedUsersIds;
    }

    function routeForAggRelated(req, res) {
    clientContext = new ClientContext req, res;
    aggregation = new Aggregation clientContext.req.params.userId;
    contenders = aggregation.handleAggRelated!();
    clientContext.res.writeHead 200, {"Content-Type": "json"};
    clientContext.res.end(JSON.stringify contenders);
    }

    function routesForRoot(app) {
    app.get "/related/map/:userId.:format?", routeForRelated;
    app.get "/related/:userId.:format?", routeForAggRelated;
    }

    server = connect.createServer();
    server.use "/", connect.router routesForRoot;
    server.listen port;
    }

    new Server 10000;