Last active
January 2, 2021 12:32
-
-
Save philipyoungg/322b94e9fd0e1388eaa68b86f3ce76eb to your computer and use it in GitHub Desktop.
Revisions
-
Philip Young revised this gist
Jan 2, 2021 . 1 changed file with 66 additions and 83 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -5,9 +5,11 @@ import * as http from "http"; function noop() {} enum WebsocketSendType { BROADCAST_EMIT = "BROADCAST_EMIT", } export type WebsocketPubsubConfig = { interval_ms?: number; websocket_config?: Websocket.ServerOptions; redis_config?: redis.ClientOpts; @@ -24,18 +26,20 @@ type WebsocketEnhanced = Websocket & { id: string; is_alive: boolean; room_id?: string; broadcastEmit: (payload: string, room_id: string) => void; joinRoom: (room_id: string) => void; }; /** * Currently only handle one room per user */ class WebsocketPubSub { private pub: redis.RedisClient; private sub: redis.RedisClient; private wss: Websocket.Server; private ROOM_MAP: Record<string, Websocket[]> = {}; private config: WebsocketPubsubConfig; constructor(__RAW_CONFIG__: WebsocketPubsubConfig = {}) { this.config = { ...{ interval_ms: 30000, @@ -61,26 +65,50 @@ class WebsocketHandler { } onConnection = (ws: WebsocketEnhanced, decoded: unknown) => { console.log("onConnection not handled"); }; onMessage = (msg: string, ws: WebsocketEnhanced) => { console.log("onMessage not handled"); }; private joinRoom = (room_id: string, ws: WebsocketEnhanced) => { // console.log("JOIN"); ws.room_id = room_id; if (this.ROOM_MAP.hasOwnProperty(room_id)) { this.ROOM_MAP[room_id].push(ws); } else { this.ROOM_MAP[room_id] = [ws]; this.onJoinNewRoom(room_id); } }; private broadcastEmit = ( payload: string, room_id: string, ws: WebsocketEnhanced ) => { try { let pubsubPayload: PubsubPayload = { sender_ws_id: ws.id, type: WebsocketSendType.BROADCAST_EMIT, payload, }; this.pub.publish( this.pubSubChannelId(ws.room_id), JSON.stringify(pubsubPayload) ); } catch (err) { console.error("Parsing data error inside broadcast emit", err); } }; /** Internal APIs */ private setupAndOnConnection = ( ws: WebsocketEnhanced, req: http.IncomingMessage ) => { /** Decode user id and generate socket id */ let decoded = req.headers.decoded; @@ -91,22 +119,25 @@ class WebsocketHandler { this.onConnection(ws, decoded); }; private initWS(ws: WebsocketEnhanced) { ws.is_alive = true; ws.id = v4(); ws.broadcastEmit = (payload: string, room_id: string) => this.broadcastEmit(payload, room_id, ws); ws.joinRoom = (room_id: string) => this.joinRoom(room_id, ws); } private pong = (ws: WebsocketEnhanced) => { ws.is_alive = true; }; private close = (ws: WebsocketEnhanced) => { this.leaveRoom(ws); ws.terminate(); }; private onHeartbeat = () => { console.log("HEARTBEAT", this.wss.clients.size); let clients = this.wss.clients as Set<WebsocketEnhanced>; clients.forEach((ss) => { let ws = ss as WebsocketEnhanced; @@ -118,18 +149,7 @@ class WebsocketHandler { }); }; private leaveRoom = (ws: WebsocketEnhanced) => { let room_id = ws.room_id; if (!room_id || !this.ROOM_MAP[room_id]) { @@ -146,53 +166,37 @@ class WebsocketHandler { } }; private onJoinNewRoom = (room_id: string) => { console.log("JOIN NEW ROOM"); this.sub.subscribe(this.pubSubChannelId(room_id)); }; private onLastLeaveRoom = (room_id: string) => { console.log("LAST LEAVE ROOM"); this.sub.unsubscribe(this.pubSubChannelId(room_id)); }; private pubSubChannelId = (room_id: string) => [this.config.redis_pubsub_name, room_id].join(":"); private isCorrectChannel(channel: string) { return channel == this.config.redis_pubsub_name; } private socketsInRoom = (room_id: string): WebsocketEnhanced[] => (this.ROOM_MAP[room_id] as WebsocketEnhanced[]) || []; private onRedisMessage = (channel: string, data: string) => { const [pubsub_name, room_id] = channel.split(":"); if (this.isCorrectChannel(pubsub_name)) { try { let { type, sender_ws_id, payload } = JSON.parse(data) as PubsubPayload; switch (type) { case WebsocketSendType.BROADCAST_EMIT: this.socketsInRoom(room_id).forEach((ws) => { if (ws.id !== sender_ws_id && ws.readyState == Websocket.OPEN) { // console.log("BROADCAST EMIT ON ", payload); ws.send(payload); } }); @@ -207,25 +211,4 @@ class WebsocketHandler { }; } export default WebsocketPubSub; -
Philip Young revised this gist
Jan 1, 2021 . 1 changed file with 4 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -60,7 +60,8 @@ class WebsocketHandler { setInterval(this.onHeartbeat, this.config.interval_ms); } onConnection = (ws: WebsocketEnhanced, decoded: unknown) => { let user_id = decoded as string; this.joinRoom(user_id, ws); ws.send(sync_payload); ws.on("message", (msg) => this.onMessage(msg as string, ws)); @@ -81,13 +82,13 @@ class WebsocketHandler { setupAndOnConnection = (ws: WebsocketEnhanced, req: http.IncomingMessage) => { /** Decode user id and generate socket id */ let decoded = req.headers.decoded; this.initWS(ws); ws.on("pong", () => this.pong(ws)); ws.on("close", () => this.close(ws)); this.onConnection(ws, decoded); }; initWS(ws: WebsocketEnhanced) { -
Philip Young revised this gist
Jan 1, 2021 . 1 changed file with 14 additions and 10 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -53,22 +53,15 @@ class WebsocketHandler { /** Subscribe to redis and handle connection */ this.sub.on("message", this.onRedisMessage); this.wss.on("connection", (ws, req) => this.setupAndOnConnection(ws as WebsocketEnhanced, req) ); /** Handle heartbeat */ setInterval(this.onHeartbeat, this.config.interval_ms); } onConnection = (ws: WebsocketEnhanced, user_id: string) => { this.joinRoom(user_id, ws); ws.send(sync_payload); ws.on("message", (msg) => this.onMessage(msg as string, ws)); }; @@ -86,6 +79,17 @@ class WebsocketHandler { /** Internal APIs */ setupAndOnConnection = (ws: WebsocketEnhanced, req: http.IncomingMessage) => { /** Decode user id and generate socket id */ let room_id = req.headers.decoded as string; this.initWS(ws); ws.on("pong", () => this.pong(ws)); ws.on("close", () => this.close(ws)); this.onConnection(ws, room_id); }; initWS(ws: WebsocketEnhanced) { ws.is_alive = true; ws.id = v4(); -
Philip Young revised this gist
Jan 1, 2021 . 1 changed file with 48 additions and 46 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -60,11 +60,59 @@ class WebsocketHandler { setInterval(this.onHeartbeat, this.config.interval_ms); } onConnection = (ws: WebsocketEnhanced, req: http.IncomingMessage) => { /** Decode user id and generate socket id */ let room_id = req.headers.decoded as string; this.initWS(ws); this.joinRoom(room_id, ws); ws.on("pong", () => this.pong(ws)); ws.on("close", () => this.close(ws)); ws.send(sync_payload); ws.on("message", (msg) => this.onMessage(msg as string, ws)); }; onMessage = (msg: string, ws: WebsocketEnhanced) => { try { let parsed: { type: string } = JSON.parse(msg); if (parsed.type == "SYNC") { this.broadcastEmit(sync_payload, ws.room_id, ws); } } catch { console.log("INVALID EVENT SENT FROM CLIENT"); } }; /** Internal APIs */ initWS(ws: WebsocketEnhanced) { ws.is_alive = true; ws.id = v4(); } pong = (ws: WebsocketEnhanced) => { ws.is_alive = true; }; close = (ws: WebsocketEnhanced) => { this.leaveRoom(ws); ws.terminate(); }; onHeartbeat = () => { console.log("HEARTBEAT"); let clients = this.wss.clients as Set<WebsocketEnhanced>; clients.forEach((ss) => { let ws = ss as WebsocketEnhanced; if (!ws.is_alive) { this.close(ws); } ws.is_alive = false; ws.ping(noop); }); }; joinRoom = (room_id: string, ws: WebsocketEnhanced) => { console.log("JOIN"); ws.room_id = room_id; @@ -76,15 +124,6 @@ class WebsocketHandler { } }; leaveRoom = (ws: WebsocketEnhanced) => { let room_id = ws.room_id; @@ -122,43 +161,6 @@ class WebsocketHandler { socketsInRoom = (room_id: string): WebsocketEnhanced[] => (this.ROOM_MAP[room_id] as WebsocketEnhanced[]) || []; broadcastEmit = (payload: string, room_id: string, ws: WebsocketEnhanced) => { try { let pubsubPayload: PubsubPayload = { -
Philip Young revised this gist
Jan 1, 2021 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -65,7 +65,7 @@ class WebsocketHandler { ws.id = v4(); } joinRoom = (room_id: string, ws: WebsocketEnhanced) => { console.log("JOIN"); ws.room_id = room_id; if (this.ROOM_MAP.hasOwnProperty(room_id)) { @@ -140,7 +140,7 @@ class WebsocketHandler { let room_id = req.headers.decoded as string; this.initWS(ws); this.joinRoom(room_id, ws); ws.on("pong", () => this.pong(ws)); ws.on("close", () => this.close(ws)); -
Philip Young revised this gist
Jan 1, 2021 . 1 changed file with 3 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -35,19 +35,19 @@ class WebsocketHandler { wss: Websocket.Server; ROOM_MAP: Record<string, Websocket[]> = {}; config: WebsocketHandlerConfig; constructor(__RAW_CONFIG__: WebsocketHandlerConfig = {}) { this.config = { ...{ interval_ms: 30000, websocket_config: { port: 8080 }, redis_config: {}, redis_pubsub_name: "REDIS_PUBSUB", }, ...__RAW_CONFIG__, }; /** Initialize pub sub and wss */ this.pub = redis.createClient(this.config.redis_config); this.sub = this.pub.duplicate(); this.wss = new Websocket.Server(this.config.websocket_config); /** Subscribe to redis and handle connection */ -
Philip Young revised this gist
Jan 1, 2021 . 1 changed file with 5 additions and 5 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -35,20 +35,20 @@ class WebsocketHandler { wss: Websocket.Server; ROOM_MAP: Record<string, Websocket[]> = {}; config: WebsocketHandlerConfig; constructor(__unsafe_config: WebsocketHandlerConfig = {}) { this.config = { ...{ interval_ms: 30000, websocket_config: { port: 8080 }, redis_config: {}, redis_pubsub_name: "REDIS_PUBSUB", }, ...__unsafe_config, }; /** Initialize pub sub and wss */ this.pub = redis.createClient(this.config.redis_config); this.sub = redis.createClient(this.config.redis_config); this.wss = new Websocket.Server(this.config.websocket_config); /** Subscribe to redis and handle connection */ this.sub.on("message", this.onRedisMessage); -
Philip Young renamed this gist
Jan 1, 2021 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
Philip Young renamed this gist
Jan 1, 2021 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
Philip Young revised this gist
Jan 1, 2021 . No changes.There are no files selected for viewing
-
Philip Young revised this gist
Jan 1, 2021 . No changes.There are no files selected for viewing
-
Philip Young revised this gist
Jan 1, 2021 . 1 changed file with 22 additions and 19 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -7,13 +7,19 @@ function noop() {} const sync_payload = JSON.stringify({ type: "SYNC" }); type WebsocketHandlerConfig = { interval_ms?: number; websocket_config?: Websocket.ServerOptions; redis_config?: redis.ClientOpts; redis_pubsub_name?: string; }; type PubsubPayload = { type: string; payload: string; sender_ws_id: string; }; type WebsocketEnhanced = Websocket & { id: string; is_alive: boolean; @@ -28,8 +34,8 @@ class WebsocketHandler { sub: redis.RedisClient; wss: Websocket.Server; ROOM_MAP: Record<string, Websocket[]> = {}; config: WebsocketHandlerConfig; constructor(config: WebsocketHandlerConfig = {}) { this.config = { ...{ interval_ms: 30000, @@ -138,7 +144,7 @@ class WebsocketHandler { ws.on("pong", () => this.pong(ws)); ws.on("close", () => this.close(ws)); ws.send(sync_payload); ws.on("message", (msg) => this.onMessage(msg as string, ws)); }; @@ -154,29 +160,26 @@ class WebsocketHandler { }; broadcastEmit = (payload: string, room_id: string, ws: WebsocketEnhanced) => { try { let pubsubPayload: PubsubPayload = { sender_ws_id: ws.id, type: "BROADCAST_EMIT", payload, }; this.pub.publish( this.pubSubChannelId(ws.room_id), JSON.stringify(pubsubPayload) ); } catch (err) { console.error("Parsing data error inside broadcast emit", err); } }; onRedisMessage = (channel: string, data: string) => { const [pubsub_name, room_id] = channel.split(":"); if (this.isCorrectChannel(pubsub_name)) { try { let { type, sender_ws_id, payload } = JSON.parse(data) as PubsubPayload; switch (type) { case "BROADCAST_EMIT": @@ -197,7 +200,7 @@ class WebsocketHandler { }; } let config: WebsocketHandlerConfig = { interval_ms: 3000, websocket_config: { port: Number(process.env.PORT) || 8080, -
Philip Young revised this gist
Jan 1, 2021 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -182,7 +182,7 @@ class WebsocketHandler { case "BROADCAST_EMIT": this.socketsInRoom(room_id).forEach((ws) => { if (ws.id !== sender_ws_id && ws.readyState == Websocket.OPEN) { console.log("BROADCAST EMIT ON ", payload); ws.send(payload); } }); -
Philip Young revised this gist
Jan 1, 2021 . 1 changed file with 17 additions and 10 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -20,6 +20,9 @@ type WebsocketEnhanced = Websocket & { room_id?: string; }; /** * Currently only handle one room per user */ class WebsocketHandler { pub: redis.RedisClient; sub: redis.RedisClient; @@ -78,15 +81,18 @@ class WebsocketHandler { leaveRoom = (ws: WebsocketEnhanced) => { let room_id = ws.room_id; if (!room_id || !this.ROOM_MAP[room_id]) { return; } this.ROOM_MAP[room_id] = this.ROOM_MAP[room_id].filter( (client) => client !== ws ); if (this.ROOM_MAP[room_id].length == 0) { delete this.ROOM_MAP[room_id]; this.onLastLeaveRoom(room_id); } }; @@ -112,7 +118,8 @@ class WebsocketHandler { onHeartbeat = () => { console.log("HEARTBEAT"); let clients = this.wss.clients as Set<WebsocketEnhanced>; clients.forEach((ss) => { let ws = ss as WebsocketEnhanced; if (!ws.is_alive) { this.close(ws); -
Philip Young revised this gist
Jan 1, 2021 . 1 changed file with 149 additions and 88 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,153 +1,214 @@ import * as redis from "redis"; import * as Websocket from "ws"; import { v4 } from "uuid"; import * as http from "http"; function noop() {} const sync_payload = JSON.stringify({ type: "SYNC" }); type Config = { interval_ms?: number; websocket_config?: Websocket.ServerOptions; redis_config?: redis.ClientOpts; redis_pubsub_name?: string; }; type WebsocketEnhanced = Websocket & { id: string; is_alive: boolean; room_id?: string; }; class WebsocketHandler { pub: redis.RedisClient; sub: redis.RedisClient; wss: Websocket.Server; ROOM_MAP: Record<string, Websocket[]> = {}; config: Config; constructor(config: Config = {}) { this.config = { ...{ interval_ms: 30000, websocket_config: { port: 8080 }, redis_config: {}, redis_pubsub_name: "REDIS_PUBSUB", }, ...config, }; /** Initialize pub sub and wss */ this.pub = redis.createClient(config.redis_config); this.sub = redis.createClient(config.redis_config); this.wss = new Websocket.Server(config.websocket_config); /** Subscribe to redis and handle connection */ this.sub.on("message", this.onRedisMessage); this.wss.on("connection", (ws, req) => this.onConnection(ws as WebsocketEnhanced, req) ); /** Handle heartbeat */ setInterval(this.onHeartbeat, this.config.interval_ms); } initWS(ws: WebsocketEnhanced) { ws.is_alive = true; ws.id = v4(); } join = (room_id: string, ws: WebsocketEnhanced) => { console.log("JOIN"); ws.room_id = room_id; if (this.ROOM_MAP.hasOwnProperty(room_id)) { this.ROOM_MAP[room_id].push(ws); } else { this.ROOM_MAP[room_id] = [ws]; this.onJoinNewRoom(room_id); } }; pong = (ws: WebsocketEnhanced) => { ws.is_alive = true; }; close = (ws: WebsocketEnhanced) => { this.leaveRoom(ws); ws.terminate(); }; leaveRoom = (ws: WebsocketEnhanced) => { let room_id = ws.room_id; if (room_id && this.ROOM_MAP[room_id]) { this.ROOM_MAP[room_id] = this.ROOM_MAP[room_id].filter( (client) => client !== ws ); if (this.ROOM_MAP[room_id].length == 0) { delete this.ROOM_MAP[room_id]; this.onLastLeaveRoom(room_id); } } }; onJoinNewRoom = (room_id: string) => { console.log("JOIN NEW ROOM"); this.sub.subscribe(this.pubSubChannelId(room_id)); }; onLastLeaveRoom = (room_id: string) => { console.log("LAST LEAVE ROOM"); this.sub.unsubscribe(this.pubSubChannelId(room_id)); }; pubSubChannelId = (room_id: string) => [this.config.redis_pubsub_name, room_id].join(":"); isCorrectChannel(channel: string) { return channel == this.config.redis_pubsub_name; } socketsInRoom = (room_id: string): WebsocketEnhanced[] => (this.ROOM_MAP[room_id] as WebsocketEnhanced[]) || []; onHeartbeat = () => { console.log("HEARTBEAT"); this.wss.clients.forEach((ss) => { let ws = ss as WebsocketEnhanced; if (!ws.is_alive) { this.close(ws); } ws.is_alive = false; ws.ping(noop); }); }; onConnection = (ws: WebsocketEnhanced, req: http.IncomingMessage) => { /** Decode user id and generate socket id */ let room_id = req.headers.decoded as string; this.initWS(ws); this.join(room_id, ws); ws.on("pong", () => this.pong(ws)); ws.on("close", () => this.close(ws)); ws.send(JSON.stringify({ type: "SYNC" })); ws.on("message", (msg) => this.onMessage(msg as string, ws)); }; onMessage = (msg: string, ws: WebsocketEnhanced) => { try { let parsed: { type: string } = JSON.parse(msg); if (parsed.type == "SYNC") { this.broadcastEmit(sync_payload, ws.room_id, ws); } } catch { console.log("INVALID EVENT SENT FROM CLIENT"); } }; broadcastEmit = (payload: string, room_id: string, ws: WebsocketEnhanced) => { this.pub.publish( this.pubSubChannelId(ws.room_id), JSON.stringify({ sender_ws_id: ws.id, type: "BROADCAST_EMIT", payload, }) ); }; onRedisMessage = (channel: string, data: string) => { const [pubsub_name, room_id] = channel.split(":"); if (this.isCorrectChannel(pubsub_name)) { try { let { type, sender_ws_id, payload, }: { type: string; payload: string; sender_ws_id: string; } = JSON.parse(data); switch (type) { case "BROADCAST_EMIT": this.socketsInRoom(room_id).forEach((ws) => { if (ws.id !== sender_ws_id && ws.readyState == Websocket.OPEN) { console.log("SEND SYNC"); ws.send(payload); } }); return; default: return; } } catch { console.error("Parsing data error inside Redis sub"); } } }; } let config: Config = { interval_ms: 3000, websocket_config: { port: Number(process.env.PORT) || 8080, verifyClient: (info, done) => { let token = info.req.headers["sec-websocket-protocol"]; if (token) { try { info.req.headers.decoded = token; return done(true); } catch { return done(false, 401, "Unauthorized"); } } return done(false, 401, "Unauthorized"); }, }, redis_pubsub_name: "SESSION_PUBSUB", }; new WebsocketHandler(config); -
Philip Young revised this gist
Dec 31, 2020 . 1 changed file with 16 additions and 18 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -18,7 +18,7 @@ class WebsocketHandler { pub: redis.RedisClient; sub: redis.RedisClient; wss: Websocket.Server; roomToSocketMap: Record<string, SocketObject[]> = {}; config: Config; constructor(config: Config = {}) { this.config = { @@ -51,7 +51,7 @@ class WebsocketHandler { this.initializeConnection(ws, room_id, socket_id); console.log("CONNECTED", this.roomToSocketMap); ws.on("pong", () => this.handlePong(room_id, ws)); @@ -74,7 +74,7 @@ class WebsocketHandler { handleHeartbeat = () => { console.log("HEARTBEAT"); let room_ids = Object.keys(this.roomToSocketMap); room_ids.forEach((room_id) => this.handleDisconnectionBy(room_id, (data) => !data.is_alive) ); @@ -84,18 +84,18 @@ class WebsocketHandler { room_id: string, termination_handler: (data: SocketObject) => boolean ) => { this.roomToSocketMap[room_id] && this.roomToSocketMap[room_id].forEach((data, index) => { if (termination_handler(data)) { data.ws.terminate(); this.roomToSocketMap[room_id].splice(index, 1); if (!this.roomToSocketMap[room_id].length) { delete this.roomToSocketMap[room_id]; this.sub.unsubscribe(this.pubSubChannelId(room_id)); } console.log("DISCONNECTED", this.roomToSocketMap); return; } @@ -109,16 +109,16 @@ class WebsocketHandler { room_id: string, socket_id: string ) => { if (!this.roomToSocketMap.hasOwnProperty(room_id)) { this.roomToSocketMap[room_id] = []; this.sub.subscribe(this.pubSubChannelId(room_id)); } this.roomToSocketMap[room_id].push({ ws, id: socket_id, is_alive: true }); }; private handlePong = (room_id: string, ws: Websocket) => { let socket = this.roomToSocketMap[room_id].find((data) => data.ws == ws); if (socket) { socket.is_alive = true; } @@ -128,16 +128,14 @@ class WebsocketHandler { const [type, room_id] = channel.split(":"); if ( type == this.config.redis_pubsub_name && this.roomToSocketMap.hasOwnProperty(room_id) ) { try { let { sender_socket_id }: { sender_socket_id: string } = JSON.parse( data ); this.roomToSocketMap[room_id].forEach((data) => { if (data.id !== sender_socket_id) { data.ws.send(SYNC); } }); @@ -149,7 +147,7 @@ class WebsocketHandler { } new WebsocketHandler({ interval_ms: 15000, websocket_config: { port: Number(process.env.PORT) || 8080 }, redis_pubsub_name: "SESSION_PUBSUB", }); -
Philip Young renamed this gist
Dec 31, 2020 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
Philip Young revised this gist
Dec 31, 2020 . No changes.There are no files selected for viewing
-
Philip Young created this gist
Dec 31, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,155 @@ const SYNC = "SYNC"; import * as redis from "redis"; import * as Websocket from "ws"; function noop() {} type SocketObject = { id: string; is_alive: boolean; ws: Websocket }; type Config = { interval_ms?: number; websocket_config?: Websocket.ServerOptions; redis_pub_config?: redis.ClientOpts; redis_sub_config?: redis.ClientOpts; redis_pubsub_name?: string; }; class WebsocketHandler { pub: redis.RedisClient; sub: redis.RedisClient; wss: Websocket.Server; userSockets: Record<string, SocketObject[]> = {}; config: Config; constructor(config: Config = {}) { this.config = { ...{ interval_ms: 30000, websocket_config: { port: 8080 }, redis_pub_config: {}, redis_sub_config: {}, redis_pubsub_name: "REDIS_PUBSUB", }, ...config, }; /** Initialize pub sub and wss */ this.pub = redis.createClient(config.redis_pub_config); this.sub = redis.createClient(config.redis_sub_config); this.wss = new Websocket.Server(config.websocket_config); /** Subscribe to redis and handle connection */ this.sub.on("message", this.handleRedisMessage); this.wss.on("connection", this.handleConnection); /** Handle heartbeat */ setInterval(this.handleHeartbeat, config.interval_ms); } handleConnection = (ws: Websocket) => { /** Decode user id and generate socket id */ let room_id = "13123"; let socket_id = Math.random().toString(); this.initializeConnection(ws, room_id, socket_id); console.log("CONNECTED", this.userSockets); ws.on("pong", () => this.handlePong(room_id, ws)); ws.on("close", () => this.handleDisconnectionBy(room_id, (data) => data.ws == ws) ); ws.send("SYNC"); ws.on("message", () => { this.pub.publish( this.pubSubChannelId(room_id), JSON.stringify({ socket_id }) ); }); }; pubSubChannelId = (room_id: string) => [this.config.redis_pubsub_name, room_id].join(":"); handleHeartbeat = () => { console.log("HEARTBEAT"); let room_ids = Object.keys(this.userSockets); room_ids.forEach((room_id) => this.handleDisconnectionBy(room_id, (data) => !data.is_alive) ); }; handleDisconnectionBy = ( room_id: string, termination_handler: (data: SocketObject) => boolean ) => { this.userSockets[room_id] && this.userSockets[room_id].forEach((data, index) => { if (termination_handler(data)) { data.ws.terminate(); this.userSockets[room_id].splice(index, 1); if (!this.userSockets[room_id].length) { delete this.userSockets[room_id]; this.sub.unsubscribe(this.pubSubChannelId(room_id)); } console.log("DISCONNECTED", this.userSockets); return; } data.is_alive = false; data.ws.ping(noop); }); }; private initializeConnection = ( ws: Websocket, room_id: string, socket_id: string ) => { if (!this.userSockets.hasOwnProperty(room_id)) { this.userSockets[room_id] = []; this.sub.subscribe(this.pubSubChannelId(room_id)); } this.userSockets[room_id].push({ ws, id: socket_id, is_alive: true }); }; private handlePong = (room_id: string, ws: Websocket) => { let socket = this.userSockets[room_id].find((data) => data.ws == ws); if (socket) { socket.is_alive = true; } }; private handleRedisMessage = (channel: string, data: string) => { const [type, room_id] = channel.split(":"); if ( type == this.config.redis_pubsub_name && this.userSockets.hasOwnProperty(room_id) ) { try { let { sender_socket_id }: { sender_socket_id: string } = JSON.parse( data ); this.userSockets[room_id].forEach((data) => { console.log({ sender: sender_socket_id, receiver: data.id }); if (data.id !== sender_socket_id) { console.log("PROCESSED"); data.ws.send(SYNC); } }); } catch { console.error("Parsing data error inside Redis sub"); } } }; } new WebsocketHandler({ interval_ms: 3000, websocket_config: { port: Number(process.env.PORT) || 8080 }, redis_pubsub_name: "SESSION_PUBSUB", });