Skip to content

Instantly share code, notes, and snippets.

@philipyoungg
Last active January 2, 2021 12:32
Show Gist options
  • Select an option

  • Save philipyoungg/322b94e9fd0e1388eaa68b86f3ce76eb to your computer and use it in GitHub Desktop.

Select an option

Save philipyoungg/322b94e9fd0e1388eaa68b86f3ce76eb to your computer and use it in GitHub Desktop.

Revisions

  1. Philip Young revised this gist Jan 2, 2021. 1 changed file with 66 additions and 83 deletions.
    149 changes: 66 additions & 83 deletions websocket_redis_pubsub.ts
    Original file line number Diff line number Diff line change
    @@ -5,9 +5,11 @@ import * as http from "http";

    function noop() {}

    const sync_payload = JSON.stringify({ type: "SYNC" });
    enum WebsocketSendType {
    BROADCAST_EMIT = "BROADCAST_EMIT",
    }

    type WebsocketHandlerConfig = {
    export type WebsocketPubsubConfig = {
    interval_ms?: number;
    websocket_config?: Websocket.ServerOptions;
    redis_config?: redis.ClientOpts;
    @@ -24,18 +26,20 @@ type WebsocketEnhanced = Websocket & {
    id: string;
    is_alive: boolean;
    room_id?: string;
    broadcastEmit: (payload: string, room_id: string) => void;
    joinRoom: (room_id: string) => void;
    };

    /**
    * Currently only handle one room per user
    */
    class WebsocketHandler {
    pub: redis.RedisClient;
    sub: redis.RedisClient;
    wss: Websocket.Server;
    ROOM_MAP: Record<string, Websocket[]> = {};
    config: WebsocketHandlerConfig;
    constructor(__RAW_CONFIG__: WebsocketHandlerConfig = {}) {
    class WebsocketPubSub {
    private pub: redis.RedisClient;
    private sub: redis.RedisClient;
    private wss: Websocket.Server;
    private ROOM_MAP: Record<string, Websocket[]> = {};
    private config: WebsocketPubsubConfig;
    constructor(__RAW_CONFIG__: WebsocketPubsubConfig = {}) {
    this.config = {
    ...{
    interval_ms: 30000,
    @@ -61,26 +65,50 @@ class WebsocketHandler {
    }

    onConnection = (ws: WebsocketEnhanced, decoded: unknown) => {
    let user_id = decoded as string;
    this.joinRoom(user_id, ws);
    ws.send(sync_payload);
    ws.on("message", (msg) => this.onMessage(msg as string, ws));
    console.log("onConnection not handled");
    };

    onMessage = (msg: string, ws: WebsocketEnhanced) => {
    console.log("onMessage not handled");
    };

    private joinRoom = (room_id: string, ws: WebsocketEnhanced) => {
    // console.log("JOIN");
    ws.room_id = room_id;
    if (this.ROOM_MAP.hasOwnProperty(room_id)) {
    this.ROOM_MAP[room_id].push(ws);
    } else {
    this.ROOM_MAP[room_id] = [ws];
    this.onJoinNewRoom(room_id);
    }
    };

    private broadcastEmit = (
    payload: string,
    room_id: string,
    ws: WebsocketEnhanced
    ) => {
    try {
    let parsed: { type: string } = JSON.parse(msg);
    if (parsed.type == "SYNC") {
    this.broadcastEmit(sync_payload, ws.room_id, ws);
    }
    } catch {
    console.log("INVALID EVENT SENT FROM CLIENT");
    let pubsubPayload: PubsubPayload = {
    sender_ws_id: ws.id,
    type: WebsocketSendType.BROADCAST_EMIT,
    payload,
    };
    this.pub.publish(
    this.pubSubChannelId(ws.room_id),
    JSON.stringify(pubsubPayload)
    );
    } catch (err) {
    console.error("Parsing data error inside broadcast emit", err);
    }
    };

    /** Internal APIs */

    setupAndOnConnection = (ws: WebsocketEnhanced, req: http.IncomingMessage) => {
    private setupAndOnConnection = (
    ws: WebsocketEnhanced,
    req: http.IncomingMessage
    ) => {
    /** Decode user id and generate socket id */
    let decoded = req.headers.decoded;

    @@ -91,22 +119,25 @@ class WebsocketHandler {
    this.onConnection(ws, decoded);
    };

    initWS(ws: WebsocketEnhanced) {
    private initWS(ws: WebsocketEnhanced) {
    ws.is_alive = true;
    ws.id = v4();
    ws.broadcastEmit = (payload: string, room_id: string) =>
    this.broadcastEmit(payload, room_id, ws);
    ws.joinRoom = (room_id: string) => this.joinRoom(room_id, ws);
    }

    pong = (ws: WebsocketEnhanced) => {
    private pong = (ws: WebsocketEnhanced) => {
    ws.is_alive = true;
    };

    close = (ws: WebsocketEnhanced) => {
    private close = (ws: WebsocketEnhanced) => {
    this.leaveRoom(ws);
    ws.terminate();
    };

    onHeartbeat = () => {
    console.log("HEARTBEAT");
    private onHeartbeat = () => {
    console.log("HEARTBEAT", this.wss.clients.size);
    let clients = this.wss.clients as Set<WebsocketEnhanced>;
    clients.forEach((ss) => {
    let ws = ss as WebsocketEnhanced;
    @@ -118,18 +149,7 @@ class WebsocketHandler {
    });
    };

    joinRoom = (room_id: string, ws: WebsocketEnhanced) => {
    console.log("JOIN");
    ws.room_id = room_id;
    if (this.ROOM_MAP.hasOwnProperty(room_id)) {
    this.ROOM_MAP[room_id].push(ws);
    } else {
    this.ROOM_MAP[room_id] = [ws];
    this.onJoinNewRoom(room_id);
    }
    };

    leaveRoom = (ws: WebsocketEnhanced) => {
    private leaveRoom = (ws: WebsocketEnhanced) => {
    let room_id = ws.room_id;

    if (!room_id || !this.ROOM_MAP[room_id]) {
    @@ -146,53 +166,37 @@ class WebsocketHandler {
    }
    };

    onJoinNewRoom = (room_id: string) => {
    private onJoinNewRoom = (room_id: string) => {
    console.log("JOIN NEW ROOM");
    this.sub.subscribe(this.pubSubChannelId(room_id));
    };

    onLastLeaveRoom = (room_id: string) => {
    private onLastLeaveRoom = (room_id: string) => {
    console.log("LAST LEAVE ROOM");
    this.sub.unsubscribe(this.pubSubChannelId(room_id));
    };

    pubSubChannelId = (room_id: string) =>
    private pubSubChannelId = (room_id: string) =>
    [this.config.redis_pubsub_name, room_id].join(":");

    isCorrectChannel(channel: string) {
    private isCorrectChannel(channel: string) {
    return channel == this.config.redis_pubsub_name;
    }

    socketsInRoom = (room_id: string): WebsocketEnhanced[] =>
    private socketsInRoom = (room_id: string): WebsocketEnhanced[] =>
    (this.ROOM_MAP[room_id] as WebsocketEnhanced[]) || [];

    broadcastEmit = (payload: string, room_id: string, ws: WebsocketEnhanced) => {
    try {
    let pubsubPayload: PubsubPayload = {
    sender_ws_id: ws.id,
    type: "BROADCAST_EMIT",
    payload,
    };
    this.pub.publish(
    this.pubSubChannelId(ws.room_id),
    JSON.stringify(pubsubPayload)
    );
    } catch (err) {
    console.error("Parsing data error inside broadcast emit", err);
    }
    };

    onRedisMessage = (channel: string, data: string) => {
    private onRedisMessage = (channel: string, data: string) => {
    const [pubsub_name, room_id] = channel.split(":");
    if (this.isCorrectChannel(pubsub_name)) {
    try {
    let { type, sender_ws_id, payload } = JSON.parse(data) as PubsubPayload;

    switch (type) {
    case "BROADCAST_EMIT":
    case WebsocketSendType.BROADCAST_EMIT:
    this.socketsInRoom(room_id).forEach((ws) => {
    if (ws.id !== sender_ws_id && ws.readyState == Websocket.OPEN) {
    console.log("BROADCAST EMIT ON ", payload);
    // console.log("BROADCAST EMIT ON ", payload);
    ws.send(payload);
    }
    });
    @@ -207,25 +211,4 @@ class WebsocketHandler {
    };
    }

    let config: WebsocketHandlerConfig = {
    interval_ms: 3000,
    websocket_config: {
    port: Number(process.env.PORT) || 8080,
    verifyClient: (info, done) => {
    let token = info.req.headers["sec-websocket-protocol"];
    if (token) {
    try {
    info.req.headers.decoded = token;
    return done(true);
    } catch {
    return done(false, 401, "Unauthorized");
    }
    }

    return done(false, 401, "Unauthorized");
    },
    },
    redis_pubsub_name: "SESSION_PUBSUB",
    };

    new WebsocketHandler(config);
    export default WebsocketPubSub;
  2. Philip Young revised this gist Jan 1, 2021. 1 changed file with 4 additions and 3 deletions.
    7 changes: 4 additions & 3 deletions websocket_redis_pubsub.ts
    Original file line number Diff line number Diff line change
    @@ -60,7 +60,8 @@ class WebsocketHandler {
    setInterval(this.onHeartbeat, this.config.interval_ms);
    }

    onConnection = (ws: WebsocketEnhanced, user_id: string) => {
    onConnection = (ws: WebsocketEnhanced, decoded: unknown) => {
    let user_id = decoded as string;
    this.joinRoom(user_id, ws);
    ws.send(sync_payload);
    ws.on("message", (msg) => this.onMessage(msg as string, ws));
    @@ -81,13 +82,13 @@ class WebsocketHandler {

    setupAndOnConnection = (ws: WebsocketEnhanced, req: http.IncomingMessage) => {
    /** Decode user id and generate socket id */
    let room_id = req.headers.decoded as string;
    let decoded = req.headers.decoded;

    this.initWS(ws);
    ws.on("pong", () => this.pong(ws));
    ws.on("close", () => this.close(ws));

    this.onConnection(ws, room_id);
    this.onConnection(ws, decoded);
    };

    initWS(ws: WebsocketEnhanced) {
  3. Philip Young revised this gist Jan 1, 2021. 1 changed file with 14 additions and 10 deletions.
    24 changes: 14 additions & 10 deletions websocket_redis_pubsub.ts
    Original file line number Diff line number Diff line change
    @@ -53,22 +53,15 @@ class WebsocketHandler {
    /** Subscribe to redis and handle connection */
    this.sub.on("message", this.onRedisMessage);
    this.wss.on("connection", (ws, req) =>
    this.onConnection(ws as WebsocketEnhanced, req)
    this.setupAndOnConnection(ws as WebsocketEnhanced, req)
    );

    /** Handle heartbeat */
    setInterval(this.onHeartbeat, this.config.interval_ms);
    }

    onConnection = (ws: WebsocketEnhanced, req: http.IncomingMessage) => {
    /** Decode user id and generate socket id */
    let room_id = req.headers.decoded as string;

    this.initWS(ws);
    this.joinRoom(room_id, ws);

    ws.on("pong", () => this.pong(ws));
    ws.on("close", () => this.close(ws));
    onConnection = (ws: WebsocketEnhanced, user_id: string) => {
    this.joinRoom(user_id, ws);
    ws.send(sync_payload);
    ws.on("message", (msg) => this.onMessage(msg as string, ws));
    };
    @@ -86,6 +79,17 @@ class WebsocketHandler {

    /** Internal APIs */

    setupAndOnConnection = (ws: WebsocketEnhanced, req: http.IncomingMessage) => {
    /** Decode user id and generate socket id */
    let room_id = req.headers.decoded as string;

    this.initWS(ws);
    ws.on("pong", () => this.pong(ws));
    ws.on("close", () => this.close(ws));

    this.onConnection(ws, room_id);
    };

    initWS(ws: WebsocketEnhanced) {
    ws.is_alive = true;
    ws.id = v4();
  4. Philip Young revised this gist Jan 1, 2021. 1 changed file with 48 additions and 46 deletions.
    94 changes: 48 additions & 46 deletions websocket_redis_pubsub.ts
    Original file line number Diff line number Diff line change
    @@ -60,11 +60,59 @@ class WebsocketHandler {
    setInterval(this.onHeartbeat, this.config.interval_ms);
    }

    onConnection = (ws: WebsocketEnhanced, req: http.IncomingMessage) => {
    /** Decode user id and generate socket id */
    let room_id = req.headers.decoded as string;

    this.initWS(ws);
    this.joinRoom(room_id, ws);

    ws.on("pong", () => this.pong(ws));
    ws.on("close", () => this.close(ws));
    ws.send(sync_payload);
    ws.on("message", (msg) => this.onMessage(msg as string, ws));
    };

    onMessage = (msg: string, ws: WebsocketEnhanced) => {
    try {
    let parsed: { type: string } = JSON.parse(msg);
    if (parsed.type == "SYNC") {
    this.broadcastEmit(sync_payload, ws.room_id, ws);
    }
    } catch {
    console.log("INVALID EVENT SENT FROM CLIENT");
    }
    };

    /** Internal APIs */

    initWS(ws: WebsocketEnhanced) {
    ws.is_alive = true;
    ws.id = v4();
    }

    pong = (ws: WebsocketEnhanced) => {
    ws.is_alive = true;
    };

    close = (ws: WebsocketEnhanced) => {
    this.leaveRoom(ws);
    ws.terminate();
    };

    onHeartbeat = () => {
    console.log("HEARTBEAT");
    let clients = this.wss.clients as Set<WebsocketEnhanced>;
    clients.forEach((ss) => {
    let ws = ss as WebsocketEnhanced;
    if (!ws.is_alive) {
    this.close(ws);
    }
    ws.is_alive = false;
    ws.ping(noop);
    });
    };

    joinRoom = (room_id: string, ws: WebsocketEnhanced) => {
    console.log("JOIN");
    ws.room_id = room_id;
    @@ -76,15 +124,6 @@ class WebsocketHandler {
    }
    };

    pong = (ws: WebsocketEnhanced) => {
    ws.is_alive = true;
    };

    close = (ws: WebsocketEnhanced) => {
    this.leaveRoom(ws);
    ws.terminate();
    };

    leaveRoom = (ws: WebsocketEnhanced) => {
    let room_id = ws.room_id;

    @@ -122,43 +161,6 @@ class WebsocketHandler {
    socketsInRoom = (room_id: string): WebsocketEnhanced[] =>
    (this.ROOM_MAP[room_id] as WebsocketEnhanced[]) || [];

    onHeartbeat = () => {
    console.log("HEARTBEAT");
    let clients = this.wss.clients as Set<WebsocketEnhanced>;
    clients.forEach((ss) => {
    let ws = ss as WebsocketEnhanced;
    if (!ws.is_alive) {
    this.close(ws);
    }
    ws.is_alive = false;
    ws.ping(noop);
    });
    };

    onConnection = (ws: WebsocketEnhanced, req: http.IncomingMessage) => {
    /** Decode user id and generate socket id */
    let room_id = req.headers.decoded as string;

    this.initWS(ws);
    this.joinRoom(room_id, ws);

    ws.on("pong", () => this.pong(ws));
    ws.on("close", () => this.close(ws));
    ws.send(sync_payload);
    ws.on("message", (msg) => this.onMessage(msg as string, ws));
    };

    onMessage = (msg: string, ws: WebsocketEnhanced) => {
    try {
    let parsed: { type: string } = JSON.parse(msg);
    if (parsed.type == "SYNC") {
    this.broadcastEmit(sync_payload, ws.room_id, ws);
    }
    } catch {
    console.log("INVALID EVENT SENT FROM CLIENT");
    }
    };

    broadcastEmit = (payload: string, room_id: string, ws: WebsocketEnhanced) => {
    try {
    let pubsubPayload: PubsubPayload = {
  5. Philip Young revised this gist Jan 1, 2021. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions websocket_redis_pubsub.ts
    Original file line number Diff line number Diff line change
    @@ -65,7 +65,7 @@ class WebsocketHandler {
    ws.id = v4();
    }

    join = (room_id: string, ws: WebsocketEnhanced) => {
    joinRoom = (room_id: string, ws: WebsocketEnhanced) => {
    console.log("JOIN");
    ws.room_id = room_id;
    if (this.ROOM_MAP.hasOwnProperty(room_id)) {
    @@ -140,7 +140,7 @@ class WebsocketHandler {
    let room_id = req.headers.decoded as string;

    this.initWS(ws);
    this.join(room_id, ws);
    this.joinRoom(room_id, ws);

    ws.on("pong", () => this.pong(ws));
    ws.on("close", () => this.close(ws));
  6. Philip Young revised this gist Jan 1, 2021. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions websocket_redis_pubsub.ts
    Original file line number Diff line number Diff line change
    @@ -35,19 +35,19 @@ class WebsocketHandler {
    wss: Websocket.Server;
    ROOM_MAP: Record<string, Websocket[]> = {};
    config: WebsocketHandlerConfig;
    constructor(__unsafe_config: WebsocketHandlerConfig = {}) {
    constructor(__RAW_CONFIG__: WebsocketHandlerConfig = {}) {
    this.config = {
    ...{
    interval_ms: 30000,
    websocket_config: { port: 8080 },
    redis_config: {},
    redis_pubsub_name: "REDIS_PUBSUB",
    },
    ...__unsafe_config,
    ...__RAW_CONFIG__,
    };
    /** Initialize pub sub and wss */
    this.pub = redis.createClient(this.config.redis_config);
    this.sub = redis.createClient(this.config.redis_config);
    this.sub = this.pub.duplicate();
    this.wss = new Websocket.Server(this.config.websocket_config);

    /** Subscribe to redis and handle connection */
  7. Philip Young revised this gist Jan 1, 2021. 1 changed file with 5 additions and 5 deletions.
    10 changes: 5 additions & 5 deletions websocket_redis_pubsub.ts
    Original file line number Diff line number Diff line change
    @@ -35,20 +35,20 @@ class WebsocketHandler {
    wss: Websocket.Server;
    ROOM_MAP: Record<string, Websocket[]> = {};
    config: WebsocketHandlerConfig;
    constructor(config: WebsocketHandlerConfig = {}) {
    constructor(__unsafe_config: WebsocketHandlerConfig = {}) {
    this.config = {
    ...{
    interval_ms: 30000,
    websocket_config: { port: 8080 },
    redis_config: {},
    redis_pubsub_name: "REDIS_PUBSUB",
    },
    ...config,
    ...__unsafe_config,
    };
    /** Initialize pub sub and wss */
    this.pub = redis.createClient(config.redis_config);
    this.sub = redis.createClient(config.redis_config);
    this.wss = new Websocket.Server(config.websocket_config);
    this.pub = redis.createClient(this.config.redis_config);
    this.sub = redis.createClient(this.config.redis_config);
    this.wss = new Websocket.Server(this.config.websocket_config);

    /** Subscribe to redis and handle connection */
    this.sub.on("message", this.onRedisMessage);
  8. Philip Young renamed this gist Jan 1, 2021. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  9. Philip Young renamed this gist Jan 1, 2021. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  10. Philip Young revised this gist Jan 1, 2021. No changes.
  11. Philip Young revised this gist Jan 1, 2021. No changes.
  12. Philip Young revised this gist Jan 1, 2021. 1 changed file with 22 additions and 19 deletions.
    41 changes: 22 additions & 19 deletions Websocket Redis PubSub.ts
    Original file line number Diff line number Diff line change
    @@ -7,13 +7,19 @@ function noop() {}

    const sync_payload = JSON.stringify({ type: "SYNC" });

    type Config = {
    type WebsocketHandlerConfig = {
    interval_ms?: number;
    websocket_config?: Websocket.ServerOptions;
    redis_config?: redis.ClientOpts;
    redis_pubsub_name?: string;
    };

    type PubsubPayload = {
    type: string;
    payload: string;
    sender_ws_id: string;
    };

    type WebsocketEnhanced = Websocket & {
    id: string;
    is_alive: boolean;
    @@ -28,8 +34,8 @@ class WebsocketHandler {
    sub: redis.RedisClient;
    wss: Websocket.Server;
    ROOM_MAP: Record<string, Websocket[]> = {};
    config: Config;
    constructor(config: Config = {}) {
    config: WebsocketHandlerConfig;
    constructor(config: WebsocketHandlerConfig = {}) {
    this.config = {
    ...{
    interval_ms: 30000,
    @@ -138,7 +144,7 @@ class WebsocketHandler {

    ws.on("pong", () => this.pong(ws));
    ws.on("close", () => this.close(ws));
    ws.send(JSON.stringify({ type: "SYNC" }));
    ws.send(sync_payload);
    ws.on("message", (msg) => this.onMessage(msg as string, ws));
    };

    @@ -154,29 +160,26 @@ class WebsocketHandler {
    };

    broadcastEmit = (payload: string, room_id: string, ws: WebsocketEnhanced) => {
    this.pub.publish(
    this.pubSubChannelId(ws.room_id),
    JSON.stringify({
    try {
    let pubsubPayload: PubsubPayload = {
    sender_ws_id: ws.id,
    type: "BROADCAST_EMIT",
    payload,
    })
    );
    };
    this.pub.publish(
    this.pubSubChannelId(ws.room_id),
    JSON.stringify(pubsubPayload)
    );
    } catch (err) {
    console.error("Parsing data error inside broadcast emit", err);
    }
    };

    onRedisMessage = (channel: string, data: string) => {
    const [pubsub_name, room_id] = channel.split(":");
    if (this.isCorrectChannel(pubsub_name)) {
    try {
    let {
    type,
    sender_ws_id,
    payload,
    }: {
    type: string;
    payload: string;
    sender_ws_id: string;
    } = JSON.parse(data);
    let { type, sender_ws_id, payload } = JSON.parse(data) as PubsubPayload;

    switch (type) {
    case "BROADCAST_EMIT":
    @@ -197,7 +200,7 @@ class WebsocketHandler {
    };
    }

    let config: Config = {
    let config: WebsocketHandlerConfig = {
    interval_ms: 3000,
    websocket_config: {
    port: Number(process.env.PORT) || 8080,
  13. Philip Young revised this gist Jan 1, 2021. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion Websocket Redis PubSub.ts
    Original file line number Diff line number Diff line change
    @@ -182,7 +182,7 @@ class WebsocketHandler {
    case "BROADCAST_EMIT":
    this.socketsInRoom(room_id).forEach((ws) => {
    if (ws.id !== sender_ws_id && ws.readyState == Websocket.OPEN) {
    console.log("SEND SYNC");
    console.log("BROADCAST EMIT ON ", payload);
    ws.send(payload);
    }
    });
  14. Philip Young revised this gist Jan 1, 2021. 1 changed file with 17 additions and 10 deletions.
    27 changes: 17 additions & 10 deletions Websocket Redis PubSub.ts
    Original file line number Diff line number Diff line change
    @@ -20,6 +20,9 @@ type WebsocketEnhanced = Websocket & {
    room_id?: string;
    };

    /**
    * Currently only handle one room per user
    */
    class WebsocketHandler {
    pub: redis.RedisClient;
    sub: redis.RedisClient;
    @@ -78,15 +81,18 @@ class WebsocketHandler {

    leaveRoom = (ws: WebsocketEnhanced) => {
    let room_id = ws.room_id;
    if (room_id && this.ROOM_MAP[room_id]) {
    this.ROOM_MAP[room_id] = this.ROOM_MAP[room_id].filter(
    (client) => client !== ws
    );

    if (this.ROOM_MAP[room_id].length == 0) {
    delete this.ROOM_MAP[room_id];
    this.onLastLeaveRoom(room_id);
    }

    if (!room_id || !this.ROOM_MAP[room_id]) {
    return;
    }

    this.ROOM_MAP[room_id] = this.ROOM_MAP[room_id].filter(
    (client) => client !== ws
    );

    if (this.ROOM_MAP[room_id].length == 0) {
    delete this.ROOM_MAP[room_id];
    this.onLastLeaveRoom(room_id);
    }
    };

    @@ -112,7 +118,8 @@ class WebsocketHandler {

    onHeartbeat = () => {
    console.log("HEARTBEAT");
    this.wss.clients.forEach((ss) => {
    let clients = this.wss.clients as Set<WebsocketEnhanced>;
    clients.forEach((ss) => {
    let ws = ss as WebsocketEnhanced;
    if (!ws.is_alive) {
    this.close(ws);
  15. Philip Young revised this gist Jan 1, 2021. 1 changed file with 149 additions and 88 deletions.
    237 changes: 149 additions & 88 deletions Websocket Redis PubSub.ts
    Original file line number Diff line number Diff line change
    @@ -1,153 +1,214 @@
    const SYNC = "SYNC";
    import * as redis from "redis";
    import * as Websocket from "ws";
    import { v4 } from "uuid";
    import * as http from "http";

    function noop() {}

    type SocketObject = { id: string; is_alive: boolean; ws: Websocket };
    const sync_payload = JSON.stringify({ type: "SYNC" });

    type Config = {
    interval_ms?: number;
    websocket_config?: Websocket.ServerOptions;
    redis_pub_config?: redis.ClientOpts;
    redis_sub_config?: redis.ClientOpts;
    redis_config?: redis.ClientOpts;
    redis_pubsub_name?: string;
    };

    type WebsocketEnhanced = Websocket & {
    id: string;
    is_alive: boolean;
    room_id?: string;
    };

    class WebsocketHandler {
    pub: redis.RedisClient;
    sub: redis.RedisClient;
    wss: Websocket.Server;
    roomToSocketMap: Record<string, SocketObject[]> = {};
    ROOM_MAP: Record<string, Websocket[]> = {};
    config: Config;
    constructor(config: Config = {}) {
    this.config = {
    ...{
    interval_ms: 30000,
    websocket_config: { port: 8080 },
    redis_pub_config: {},
    redis_sub_config: {},
    redis_config: {},
    redis_pubsub_name: "REDIS_PUBSUB",
    },
    ...config,
    };
    /** Initialize pub sub and wss */
    this.pub = redis.createClient(config.redis_pub_config);
    this.sub = redis.createClient(config.redis_sub_config);
    this.pub = redis.createClient(config.redis_config);
    this.sub = redis.createClient(config.redis_config);
    this.wss = new Websocket.Server(config.websocket_config);

    /** Subscribe to redis and handle connection */
    this.sub.on("message", this.handleRedisMessage);
    this.wss.on("connection", this.handleConnection);
    this.sub.on("message", this.onRedisMessage);
    this.wss.on("connection", (ws, req) =>
    this.onConnection(ws as WebsocketEnhanced, req)
    );

    /** Handle heartbeat */
    setInterval(this.handleHeartbeat, config.interval_ms);
    setInterval(this.onHeartbeat, this.config.interval_ms);
    }

    handleConnection = (ws: Websocket) => {
    /** Decode user id and generate socket id */
    let room_id = "13123";
    let socket_id = Math.random().toString();
    initWS(ws: WebsocketEnhanced) {
    ws.is_alive = true;
    ws.id = v4();
    }

    this.initializeConnection(ws, room_id, socket_id);
    join = (room_id: string, ws: WebsocketEnhanced) => {
    console.log("JOIN");
    ws.room_id = room_id;
    if (this.ROOM_MAP.hasOwnProperty(room_id)) {
    this.ROOM_MAP[room_id].push(ws);
    } else {
    this.ROOM_MAP[room_id] = [ws];
    this.onJoinNewRoom(room_id);
    }
    };

    console.log("CONNECTED", this.roomToSocketMap);
    pong = (ws: WebsocketEnhanced) => {
    ws.is_alive = true;
    };

    ws.on("pong", () => this.handlePong(room_id, ws));
    close = (ws: WebsocketEnhanced) => {
    this.leaveRoom(ws);
    ws.terminate();
    };

    ws.on("close", () =>
    this.handleDisconnectionBy(room_id, (data) => data.ws == ws)
    );
    leaveRoom = (ws: WebsocketEnhanced) => {
    let room_id = ws.room_id;
    if (room_id && this.ROOM_MAP[room_id]) {
    this.ROOM_MAP[room_id] = this.ROOM_MAP[room_id].filter(
    (client) => client !== ws
    );

    ws.send("SYNC");
    if (this.ROOM_MAP[room_id].length == 0) {
    delete this.ROOM_MAP[room_id];
    this.onLastLeaveRoom(room_id);
    }
    }
    };

    ws.on("message", () => {
    this.pub.publish(
    this.pubSubChannelId(room_id),
    JSON.stringify({ socket_id })
    );
    });
    onJoinNewRoom = (room_id: string) => {
    console.log("JOIN NEW ROOM");
    this.sub.subscribe(this.pubSubChannelId(room_id));
    };

    onLastLeaveRoom = (room_id: string) => {
    console.log("LAST LEAVE ROOM");
    this.sub.unsubscribe(this.pubSubChannelId(room_id));
    };

    pubSubChannelId = (room_id: string) =>
    [this.config.redis_pubsub_name, room_id].join(":");

    handleHeartbeat = () => {
    isCorrectChannel(channel: string) {
    return channel == this.config.redis_pubsub_name;
    }

    socketsInRoom = (room_id: string): WebsocketEnhanced[] =>
    (this.ROOM_MAP[room_id] as WebsocketEnhanced[]) || [];

    onHeartbeat = () => {
    console.log("HEARTBEAT");
    let room_ids = Object.keys(this.roomToSocketMap);
    room_ids.forEach((room_id) =>
    this.handleDisconnectionBy(room_id, (data) => !data.is_alive)
    );
    this.wss.clients.forEach((ss) => {
    let ws = ss as WebsocketEnhanced;
    if (!ws.is_alive) {
    this.close(ws);
    }
    ws.is_alive = false;
    ws.ping(noop);
    });
    };

    handleDisconnectionBy = (
    room_id: string,
    termination_handler: (data: SocketObject) => boolean
    ) => {
    this.roomToSocketMap[room_id] &&
    this.roomToSocketMap[room_id].forEach((data, index) => {
    if (termination_handler(data)) {
    data.ws.terminate();
    this.roomToSocketMap[room_id].splice(index, 1);

    if (!this.roomToSocketMap[room_id].length) {
    delete this.roomToSocketMap[room_id];
    this.sub.unsubscribe(this.pubSubChannelId(room_id));
    }

    console.log("DISCONNECTED", this.roomToSocketMap);
    return;
    }
    onConnection = (ws: WebsocketEnhanced, req: http.IncomingMessage) => {
    /** Decode user id and generate socket id */
    let room_id = req.headers.decoded as string;

    data.is_alive = false;
    data.ws.ping(noop);
    });
    this.initWS(ws);
    this.join(room_id, ws);

    ws.on("pong", () => this.pong(ws));
    ws.on("close", () => this.close(ws));
    ws.send(JSON.stringify({ type: "SYNC" }));
    ws.on("message", (msg) => this.onMessage(msg as string, ws));
    };

    private initializeConnection = (
    ws: Websocket,
    room_id: string,
    socket_id: string
    ) => {
    if (!this.roomToSocketMap.hasOwnProperty(room_id)) {
    this.roomToSocketMap[room_id] = [];
    this.sub.subscribe(this.pubSubChannelId(room_id));
    onMessage = (msg: string, ws: WebsocketEnhanced) => {
    try {
    let parsed: { type: string } = JSON.parse(msg);
    if (parsed.type == "SYNC") {
    this.broadcastEmit(sync_payload, ws.room_id, ws);
    }
    } catch {
    console.log("INVALID EVENT SENT FROM CLIENT");
    }

    this.roomToSocketMap[room_id].push({ ws, id: socket_id, is_alive: true });
    };

    private handlePong = (room_id: string, ws: Websocket) => {
    let socket = this.roomToSocketMap[room_id].find((data) => data.ws == ws);
    if (socket) {
    socket.is_alive = true;
    }
    broadcastEmit = (payload: string, room_id: string, ws: WebsocketEnhanced) => {
    this.pub.publish(
    this.pubSubChannelId(ws.room_id),
    JSON.stringify({
    sender_ws_id: ws.id,
    type: "BROADCAST_EMIT",
    payload,
    })
    );
    };

    private handleRedisMessage = (channel: string, data: string) => {
    const [type, room_id] = channel.split(":");
    if (
    type == this.config.redis_pubsub_name &&
    this.roomToSocketMap.hasOwnProperty(room_id)
    ) {
    onRedisMessage = (channel: string, data: string) => {
    const [pubsub_name, room_id] = channel.split(":");
    if (this.isCorrectChannel(pubsub_name)) {
    try {
    let { sender_socket_id }: { sender_socket_id: string } = JSON.parse(
    data
    );
    this.roomToSocketMap[room_id].forEach((data) => {
    if (data.id !== sender_socket_id) {
    data.ws.send(SYNC);
    }
    });
    let {
    type,
    sender_ws_id,
    payload,
    }: {
    type: string;
    payload: string;
    sender_ws_id: string;
    } = JSON.parse(data);

    switch (type) {
    case "BROADCAST_EMIT":
    this.socketsInRoom(room_id).forEach((ws) => {
    if (ws.id !== sender_ws_id && ws.readyState == Websocket.OPEN) {
    console.log("SEND SYNC");
    ws.send(payload);
    }
    });
    return;
    default:
    return;
    }
    } catch {
    console.error("Parsing data error inside Redis sub");
    }
    }
    };
    }

    new WebsocketHandler({
    interval_ms: 15000,
    websocket_config: { port: Number(process.env.PORT) || 8080 },
    let config: Config = {
    interval_ms: 3000,
    websocket_config: {
    port: Number(process.env.PORT) || 8080,
    verifyClient: (info, done) => {
    let token = info.req.headers["sec-websocket-protocol"];
    if (token) {
    try {
    info.req.headers.decoded = token;
    return done(true);
    } catch {
    return done(false, 401, "Unauthorized");
    }
    }

    return done(false, 401, "Unauthorized");
    },
    },
    redis_pubsub_name: "SESSION_PUBSUB",
    });
    };

    new WebsocketHandler(config);
  16. Philip Young revised this gist Dec 31, 2020. 1 changed file with 16 additions and 18 deletions.
    34 changes: 16 additions & 18 deletions Websocket Redis PubSub.ts
    Original file line number Diff line number Diff line change
    @@ -18,7 +18,7 @@ class WebsocketHandler {
    pub: redis.RedisClient;
    sub: redis.RedisClient;
    wss: Websocket.Server;
    userSockets: Record<string, SocketObject[]> = {};
    roomToSocketMap: Record<string, SocketObject[]> = {};
    config: Config;
    constructor(config: Config = {}) {
    this.config = {
    @@ -51,7 +51,7 @@ class WebsocketHandler {

    this.initializeConnection(ws, room_id, socket_id);

    console.log("CONNECTED", this.userSockets);
    console.log("CONNECTED", this.roomToSocketMap);

    ws.on("pong", () => this.handlePong(room_id, ws));

    @@ -74,7 +74,7 @@ class WebsocketHandler {

    handleHeartbeat = () => {
    console.log("HEARTBEAT");
    let room_ids = Object.keys(this.userSockets);
    let room_ids = Object.keys(this.roomToSocketMap);
    room_ids.forEach((room_id) =>
    this.handleDisconnectionBy(room_id, (data) => !data.is_alive)
    );
    @@ -84,18 +84,18 @@ class WebsocketHandler {
    room_id: string,
    termination_handler: (data: SocketObject) => boolean
    ) => {
    this.userSockets[room_id] &&
    this.userSockets[room_id].forEach((data, index) => {
    this.roomToSocketMap[room_id] &&
    this.roomToSocketMap[room_id].forEach((data, index) => {
    if (termination_handler(data)) {
    data.ws.terminate();
    this.userSockets[room_id].splice(index, 1);
    this.roomToSocketMap[room_id].splice(index, 1);

    if (!this.userSockets[room_id].length) {
    delete this.userSockets[room_id];
    if (!this.roomToSocketMap[room_id].length) {
    delete this.roomToSocketMap[room_id];
    this.sub.unsubscribe(this.pubSubChannelId(room_id));
    }

    console.log("DISCONNECTED", this.userSockets);
    console.log("DISCONNECTED", this.roomToSocketMap);
    return;
    }

    @@ -109,16 +109,16 @@ class WebsocketHandler {
    room_id: string,
    socket_id: string
    ) => {
    if (!this.userSockets.hasOwnProperty(room_id)) {
    this.userSockets[room_id] = [];
    if (!this.roomToSocketMap.hasOwnProperty(room_id)) {
    this.roomToSocketMap[room_id] = [];
    this.sub.subscribe(this.pubSubChannelId(room_id));
    }

    this.userSockets[room_id].push({ ws, id: socket_id, is_alive: true });
    this.roomToSocketMap[room_id].push({ ws, id: socket_id, is_alive: true });
    };

    private handlePong = (room_id: string, ws: Websocket) => {
    let socket = this.userSockets[room_id].find((data) => data.ws == ws);
    let socket = this.roomToSocketMap[room_id].find((data) => data.ws == ws);
    if (socket) {
    socket.is_alive = true;
    }
    @@ -128,16 +128,14 @@ class WebsocketHandler {
    const [type, room_id] = channel.split(":");
    if (
    type == this.config.redis_pubsub_name &&
    this.userSockets.hasOwnProperty(room_id)
    this.roomToSocketMap.hasOwnProperty(room_id)
    ) {
    try {
    let { sender_socket_id }: { sender_socket_id: string } = JSON.parse(
    data
    );
    this.userSockets[room_id].forEach((data) => {
    console.log({ sender: sender_socket_id, receiver: data.id });
    this.roomToSocketMap[room_id].forEach((data) => {
    if (data.id !== sender_socket_id) {
    console.log("PROCESSED");
    data.ws.send(SYNC);
    }
    });
    @@ -149,7 +147,7 @@ class WebsocketHandler {
    }

    new WebsocketHandler({
    interval_ms: 3000,
    interval_ms: 15000,
    websocket_config: { port: Number(process.env.PORT) || 8080 },
    redis_pubsub_name: "SESSION_PUBSUB",
    });
  17. Philip Young renamed this gist Dec 31, 2020. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  18. Philip Young revised this gist Dec 31, 2020. No changes.
  19. Philip Young created this gist Dec 31, 2020.
    155 changes: 155 additions & 0 deletions Websocket Redis PubSub
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,155 @@
    const SYNC = "SYNC";
    import * as redis from "redis";
    import * as Websocket from "ws";

    function noop() {}

    type SocketObject = { id: string; is_alive: boolean; ws: Websocket };

    type Config = {
    interval_ms?: number;
    websocket_config?: Websocket.ServerOptions;
    redis_pub_config?: redis.ClientOpts;
    redis_sub_config?: redis.ClientOpts;
    redis_pubsub_name?: string;
    };

    class WebsocketHandler {
    pub: redis.RedisClient;
    sub: redis.RedisClient;
    wss: Websocket.Server;
    userSockets: Record<string, SocketObject[]> = {};
    config: Config;
    constructor(config: Config = {}) {
    this.config = {
    ...{
    interval_ms: 30000,
    websocket_config: { port: 8080 },
    redis_pub_config: {},
    redis_sub_config: {},
    redis_pubsub_name: "REDIS_PUBSUB",
    },
    ...config,
    };
    /** Initialize pub sub and wss */
    this.pub = redis.createClient(config.redis_pub_config);
    this.sub = redis.createClient(config.redis_sub_config);
    this.wss = new Websocket.Server(config.websocket_config);

    /** Subscribe to redis and handle connection */
    this.sub.on("message", this.handleRedisMessage);
    this.wss.on("connection", this.handleConnection);

    /** Handle heartbeat */
    setInterval(this.handleHeartbeat, config.interval_ms);
    }

    handleConnection = (ws: Websocket) => {
    /** Decode user id and generate socket id */
    let room_id = "13123";
    let socket_id = Math.random().toString();

    this.initializeConnection(ws, room_id, socket_id);

    console.log("CONNECTED", this.userSockets);

    ws.on("pong", () => this.handlePong(room_id, ws));

    ws.on("close", () =>
    this.handleDisconnectionBy(room_id, (data) => data.ws == ws)
    );

    ws.send("SYNC");

    ws.on("message", () => {
    this.pub.publish(
    this.pubSubChannelId(room_id),
    JSON.stringify({ socket_id })
    );
    });
    };

    pubSubChannelId = (room_id: string) =>
    [this.config.redis_pubsub_name, room_id].join(":");

    handleHeartbeat = () => {
    console.log("HEARTBEAT");
    let room_ids = Object.keys(this.userSockets);
    room_ids.forEach((room_id) =>
    this.handleDisconnectionBy(room_id, (data) => !data.is_alive)
    );
    };

    handleDisconnectionBy = (
    room_id: string,
    termination_handler: (data: SocketObject) => boolean
    ) => {
    this.userSockets[room_id] &&
    this.userSockets[room_id].forEach((data, index) => {
    if (termination_handler(data)) {
    data.ws.terminate();
    this.userSockets[room_id].splice(index, 1);

    if (!this.userSockets[room_id].length) {
    delete this.userSockets[room_id];
    this.sub.unsubscribe(this.pubSubChannelId(room_id));
    }

    console.log("DISCONNECTED", this.userSockets);
    return;
    }

    data.is_alive = false;
    data.ws.ping(noop);
    });
    };

    private initializeConnection = (
    ws: Websocket,
    room_id: string,
    socket_id: string
    ) => {
    if (!this.userSockets.hasOwnProperty(room_id)) {
    this.userSockets[room_id] = [];
    this.sub.subscribe(this.pubSubChannelId(room_id));
    }

    this.userSockets[room_id].push({ ws, id: socket_id, is_alive: true });
    };

    private handlePong = (room_id: string, ws: Websocket) => {
    let socket = this.userSockets[room_id].find((data) => data.ws == ws);
    if (socket) {
    socket.is_alive = true;
    }
    };

    private handleRedisMessage = (channel: string, data: string) => {
    const [type, room_id] = channel.split(":");
    if (
    type == this.config.redis_pubsub_name &&
    this.userSockets.hasOwnProperty(room_id)
    ) {
    try {
    let { sender_socket_id }: { sender_socket_id: string } = JSON.parse(
    data
    );
    this.userSockets[room_id].forEach((data) => {
    console.log({ sender: sender_socket_id, receiver: data.id });
    if (data.id !== sender_socket_id) {
    console.log("PROCESSED");
    data.ws.send(SYNC);
    }
    });
    } catch {
    console.error("Parsing data error inside Redis sub");
    }
    }
    };
    }

    new WebsocketHandler({
    interval_ms: 3000,
    websocket_config: { port: Number(process.env.PORT) || 8080 },
    redis_pubsub_name: "SESSION_PUBSUB",
    });