Last active
August 17, 2025 13:14
-
-
Save ronnyroeller/6d244c5bcee2d71c346bf39c7fa04bad to your computer and use it in GitHub Desktop.
Revisions
-
ronnyroeller revised this gist
Aug 6, 2021 . 1 changed file with 149 additions and 149 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -5,184 +5,184 @@ import { scanItems } from './dynamodb'; const { AWS_REGION, TOPICS_TABLE } = process.env; const dynamoDb = new DynamoDB({ apiVersion: '2012-08-10', region: AWS_REGION, }); // Message structure and protocol flow taken from y-webrtc/bin/server.js interface YWebRtcSubscriptionMessage { type: 'subscribe' | 'unsubscribe'; topics?: string[]; } interface YWebRtcPingMessage { type: 'ping'; } interface YWebRtcPublishMessage { type: 'publish'; topic?: string; [k: string]: any; } async function subscribe(topic: string, connectionId: string) { try { return await dynamoDb .updateItem({ TableName: TOPICS_TABLE, Key: { name: { S: topic } }, UpdateExpression: 'ADD receivers :r', ExpressionAttributeValues: { ':r': { SS: [connectionId] }, }, }) .promise(); } catch (err) { console.log(`Cannot update topic ${topic}: ${err.message}`); } } async function unsubscribe(topic: string, connectionId: string) { try { return await dynamoDb .updateItem({ TableName: TOPICS_TABLE, Key: { name: { S: topic } }, UpdateExpression: 'DELETE receivers :r', ExpressionAttributeValues: { ':r': { SS: [connectionId] }, }, }) .promise(); } catch (err) { console.log(`Cannot update topic ${topic}: ${err.message}`); } } async function getReceivers(topic: string) { try { const { Item: item } = await dynamoDb .getItem({ TableName: TOPICS_TABLE, Key: { name: { S: topic } }, }) .promise(); return item?.receivers ? item.receivers.SS : []; } catch (err) { console.log(`Cannot get topic ${topic}: ${err.message}`); return []; } } async function handleYWebRtcMessage( connectionId: string, message: | YWebRtcSubscriptionMessage | YWebRtcPublishMessage | YWebRtcPingMessage, send: (receiver: string, message: any) => Promise<void>, ) { const promises = []; if (message && message.type) { switch (message.type) { case 'subscribe': (message.topics || []).forEach(topic => { promises.push(subscribe(topic, connectionId)); }); break; case 'unsubscribe': (message.topics || []).forEach(topic => { promises.push(unsubscribe(topic, connectionId)); }); break; case 'publish': if (message.topic) { const receivers = await getReceivers(message.topic); receivers.forEach(receiver => { promises.push(send(receiver, message)); }); } break; case 'ping': promises.push(send(connectionId, { type: 'pong' })); break; } } await Promise.all(promises); } function handleConnect(connectionId: string) { // Nothing to do console.log(`Connected: ${connectionId}`); } async function handleDisconnect(connectionId: string) { console.log(`Disconnected: ${connectionId}`); // Remove the connection from all topics // This is quite expensive, as we need to go through all topics in the table const promises = []; for await (const item of scanItems(dynamoDb, TOPICS_TABLE)) { const receivers = item.receivers?.SS ?? []; if (receivers.includes(connectionId)) { promises.push(unsubscribe(item.name.S, connectionId)); } } await Promise.all(promises); } export async function handler( event: HttpV2WebsocketEvent, ): Promise<HttpV2Response> { if (!TOPICS_TABLE) { return { statusCode: 502, body: 'Not configured' }; } // The AWS "simple chat" example uses event.requestContext.domainName/...stage, but that doesn't work with custom domain // names. It also doesn't matter, this is anyways an internal (AWS->AWS) call. const apigwManagementApi = new ApiGatewayManagementApi({ apiVersion: '2018-11-29', endpoint: `https://${event.requestContext.apiId}.execute-api.${AWS_REGION}.amazonaws.com/${event.requestContext.stage}`, }); const send = async (connectionId: string, message: any) => { try { await apigwManagementApi .postToConnection({ ConnectionId: connectionId, Data: JSON.stringify(message), }) .promise(); } catch (err) { if (err.statusCode === 410) { console.log(`Found stale connection, deleting ${connectionId}`); await handleDisconnect(connectionId); } else { // Log, but otherwise ignore: There's not much we can do, really. console.log(`Error when sending to ${connectionId}: ${err.message}`); } } }; try { switch (event.requestContext.routeKey) { case '$connect': handleConnect(event.requestContext.connectionId); break; case '$disconnect': await handleDisconnect(event.requestContext.connectionId); break; case '$default': await handleYWebRtcMessage( event.requestContext.connectionId, JSON.parse(event.body), send, ); break; } return { statusCode: 200 }; } catch (err) { console.log(`Error ${event.requestContext.connectionId}`, err); return { statusCode: 500, body: err.message }; } } -
ronnyroeller revised this gist
Aug 6, 2021 . 1 changed file with 152 additions and 169 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,205 +1,188 @@ import { ApiGatewayManagementApi, DynamoDB } from 'aws-sdk'; import { scanItems } from './dynamodb'; const { AWS_REGION, TOPICS_TABLE } = process.env; const dynamoDb = new DynamoDB({ apiVersion: '2012-08-10', region: AWS_REGION, }); // Message structure and protocol flow taken from y-webrtc/bin/server.js interface YWebRtcSubscriptionMessage { type: 'subscribe' | 'unsubscribe'; topics?: string[]; } interface YWebRtcPingMessage { type: 'ping'; } interface YWebRtcPublishMessage { type: 'publish'; topic?: string; [k: string]: any; } async function subscribe(topic: string, connectionId: string) { try { return await dynamoDb .updateItem({ TableName: TOPICS_TABLE, Key: { name: { S: topic } }, UpdateExpression: 'ADD receivers :r', ExpressionAttributeValues: { ':r': { SS: [connectionId] }, }, }) .promise(); } catch (err) { console.log(`Cannot update topic ${topic}: ${err.message}`); } } async function unsubscribe(topic: string, connectionId: string) { try { return await dynamoDb .updateItem({ TableName: TOPICS_TABLE, Key: { name: { S: topic } }, UpdateExpression: 'DELETE receivers :r', ExpressionAttributeValues: { ':r': { SS: [connectionId] }, }, }) .promise(); } catch (err) { console.log(`Cannot update topic ${topic}: ${err.message}`); } } async function getReceivers(topic: string) { try { const { Item: item } = await dynamoDb .getItem({ TableName: TOPICS_TABLE, Key: { name: { S: topic } }, }) .promise(); return item?.receivers ? item.receivers.SS : []; } catch (err) { console.log(`Cannot get topic ${topic}: ${err.message}`); return []; } } async function handleYWebRtcMessage( connectionId: string, message: | YWebRtcSubscriptionMessage | YWebRtcPublishMessage | YWebRtcPingMessage, send: (receiver: string, message: any) => Promise<void>, ) { const promises = []; if (message && message.type) { switch (message.type) { case 'subscribe': (message.topics || []).forEach(topic => { promises.push(subscribe(topic, connectionId)); }); break; case 'unsubscribe': (message.topics || []).forEach(topic => { promises.push(unsubscribe(topic, connectionId)); }); break; case 'publish': if (message.topic) { const receivers = await getReceivers(message.topic); receivers.forEach(receiver => { promises.push(send(receiver, message)); }); } break; case 'ping': promises.push(send(connectionId, { type: 'pong' })); break; } } await Promise.all(promises); } function handleConnect(connectionId: string) { // Nothing to do console.log(`Connected: ${connectionId}`); } async function handleDisconnect(connectionId: string) { console.log(`Disconnected: ${connectionId}`); // Remove the connection from all topics // This is quite expensive, as we need to go through all topics in the table const promises = []; for await (const item of scanItems(dynamoDb, TOPICS_TABLE)) { const receivers = item.receivers?.SS ?? []; if (receivers.includes(connectionId)) { promises.push(unsubscribe(item.name.S, connectionId)); } } await Promise.all(promises); } export async function handler( event: HttpV2WebsocketEvent, ): Promise<HttpV2Response> { if (!TOPICS_TABLE) { return { statusCode: 502, body: 'Not configured' }; } // The AWS "simple chat" example uses event.requestContext.domainName/...stage, but that doesn't work with custom domain // names. It also doesn't matter, this is anyways an internal (AWS->AWS) call. const apigwManagementApi = new ApiGatewayManagementApi({ apiVersion: '2018-11-29', endpoint: `https://${event.requestContext.apiId}.execute-api.${AWS_REGION}.amazonaws.com/${event.requestContext.stage}`, }); const send = async (connectionId: string, message: any) => { try { await apigwManagementApi .postToConnection({ ConnectionId: connectionId, Data: JSON.stringify(message), }) .promise(); } catch (err) { if (err.statusCode === 410) { console.log(`Found stale connection, deleting ${connectionId}`); await handleDisconnect(connectionId); } else { // Log, but otherwise ignore: There's not much we can do, really. console.log(`Error when sending to ${connectionId}: ${err.message}`); } } }; try { switch (event.requestContext.routeKey) { case '$connect': handleConnect(event.requestContext.connectionId); break; case '$disconnect': await handleDisconnect(event.requestContext.connectionId); break; case '$default': await handleYWebRtcMessage( event.requestContext.connectionId, JSON.parse(event.body), send, ); break; } return { statusCode: 200 }; } catch (err) { console.log(`Error ${event.requestContext.connectionId}`, err); return { statusCode: 500, body: err.message }; } } -
ronnyroeller revised this gist
Aug 6, 2021 . 1 changed file with 164 additions and 164 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,203 +3,203 @@ import { ApiGatewayManagementApi, DynamoDB } from 'aws-sdk'; const { AWS_REGION, TOPICS_TABLE } = process.env; const dynamoDb = new DynamoDB({ apiVersion: '2012-08-10', region: AWS_REGION, }); export async function* scanItems( dynamoDb: DynamoDB, tableName: string, extraScanInput?: Omit<DynamoDB.ScanInput, 'TableName' | 'ExclusiveStartKey'>, ): AsyncGenerator<DynamoDB.AttributeMap, void, unknown> { let startKey: DynamoDB.Key | undefined; do { const result = await dynamoDb .scan({ TableName: tableName, ExclusiveStartKey: startKey, ...extraScanInput, }) .promise(); for (const item of result.Items) { yield item; } startKey = result.LastEvaluatedKey; } while (startKey); } // Message structure and protocol flow taken from y-webrtc/bin/server.js interface YWebRtcSubscriptionMessage { type: 'subscribe' | 'unsubscribe'; topics?: string[]; } interface YWebRtcPingMessage { type: 'ping'; } interface YWebRtcPublishMessage { type: 'publish'; topic?: string; [k: string]: any; } async function subscribe(topic: string, connectionId: string) { return dynamoDb .updateItem({ TableName: TOPICS_TABLE, Key: { name: { S: topic } }, UpdateExpression: 'ADD receivers :r', ExpressionAttributeValues: { ':r': { SS: [connectionId] }, }, }) .promise() .catch(err => { console.log(`Cannot update topic ${topic}: ${err.message}`); }); } async function unsubscribe(topic: string, connectionId: string) { return dynamoDb .updateItem({ TableName: TOPICS_TABLE, Key: { name: { S: topic } }, UpdateExpression: 'DELETE receivers :r', ExpressionAttributeValues: { ':r': { SS: [connectionId] }, }, }) .promise() .catch(err => { console.log(`Cannot update topic ${topic}: ${err.message}`); }); } async function getReceivers(topic: string) { try { const { Item: item } = await dynamoDb .getItem({ TableName: TOPICS_TABLE, Key: { name: { S: topic } }, }) .promise(); return item?.receivers ? item.receivers.SS : []; } catch (err) { console.log(`Cannot get topic ${topic}: ${err.message}`); return []; } } async function handleYWebRtcMessage( connectionId: string, message: | YWebRtcSubscriptionMessage | YWebRtcPublishMessage | YWebRtcPingMessage, send: (receiver: string, message: any) => Promise<void>, ) { const promises = []; if (message && message.type) { switch (message.type) { case 'subscribe': (message.topics || []).forEach(topic => { promises.push(subscribe(topic, connectionId)); }); break; case 'unsubscribe': (message.topics || []).forEach(topic => { promises.push(unsubscribe(topic, connectionId)); }); break; case 'publish': if (message.topic) { const receivers = await getReceivers(message.topic); receivers.forEach(receiver => { promises.push(send(receiver, message)); }); } break; case 'ping': promises.push(send(connectionId, { type: 'pong' })); break; } } await Promise.all(promises); } function handleConnect(connectionId: string) { // Nothing to do console.log(`Connected: ${connectionId}`); } async function handleDisconnect(connectionId: string) { console.log(`Disconnected: ${connectionId}`); // Remove the connection from all topics // This is quite expensive, as we need to go through all topics in the table const promises = []; for await (const item of scanItems(dynamoDb, TOPICS_TABLE)) { const receivers = item.receivers?.SS ?? []; if (receivers.includes(connectionId)) { promises.push(unsubscribe(item.name.S, connectionId)); } } await Promise.all(promises); } export async function handler( event: HttpV2WebsocketEvent, ): Promise<HttpV2Response> { if (!TOPICS_TABLE) { return { statusCode: 502, body: 'Not configured' }; } // The AWS "simple chat" example uses event.requestContext.domainName/...stage, but that doesn't work with custom domain // names. It also doesn't matter, this is anyways an internal (AWS->AWS) call. const apigwManagementApi = new ApiGatewayManagementApi({ apiVersion: '2018-11-29', endpoint: `https://${event.requestContext.apiId}.execute-api.${AWS_REGION}.amazonaws.com/${event.requestContext.stage}`, }); const send = async (connectionId: string, message: any) => { try { await apigwManagementApi .postToConnection({ ConnectionId: connectionId, Data: JSON.stringify(message), }) .promise(); } catch (err) { if (err.statusCode === 410) { console.log(`Found stale connection, deleting ${connectionId}`); await handleDisconnect(connectionId); } else { // Log, but otherwise ignore: There's not much we can do, really. console.log(`Error when sending to ${connectionId}: ${err.message}`); } } }; try { switch (event.requestContext.routeKey) { case '$connect': handleConnect(event.requestContext.connectionId); break; case '$disconnect': await handleDisconnect(event.requestContext.connectionId); break; case '$default': await handleYWebRtcMessage( event.requestContext.connectionId, JSON.parse(event.body), send, ); break; } return { statusCode: 200 }; } catch (err) { console.log(`Error ${event.requestContext.connectionId}`, err); return { statusCode: 500, body: err.message }; } } -
ronnyroeller revised this gist
Aug 6, 2021 . No changes.There are no files selected for viewing
-
ronnyroeller revised this gist
Aug 6, 2021 . No changes.There are no files selected for viewing
-
ronnyroeller created this gist
Aug 6, 2021 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,205 @@ import { ApiGatewayManagementApi, DynamoDB } from 'aws-sdk'; const { AWS_REGION, TOPICS_TABLE } = process.env; const dynamoDb = new DynamoDB({ apiVersion: '2012-08-10', region: AWS_REGION, }); export async function* scanItems( dynamoDb: DynamoDB, tableName: string, extraScanInput?: Omit<DynamoDB.ScanInput, 'TableName' | 'ExclusiveStartKey'>, ): AsyncGenerator<DynamoDB.AttributeMap, void, unknown> { let startKey: DynamoDB.Key | undefined; do { const result = await dynamoDb .scan({ TableName: tableName, ExclusiveStartKey: startKey, ...extraScanInput, }) .promise(); for (const item of result.Items) { yield item; } startKey = result.LastEvaluatedKey; } while (startKey); } // Message structure and protocol flow taken from y-webrtc/bin/server.js interface YWebRtcSubscriptionMessage { type: 'subscribe' | 'unsubscribe'; topics?: string[]; } interface YWebRtcPingMessage { type: 'ping'; } interface YWebRtcPublishMessage { type: 'publish'; topic?: string; [k: string]: any; } async function subscribe(topic: string, connectionId: string) { return dynamoDb .updateItem({ TableName: TOPICS_TABLE, Key: { name: { S: topic } }, UpdateExpression: 'ADD receivers :r', ExpressionAttributeValues: { ':r': { SS: [connectionId] }, }, }) .promise() .catch(err => { console.log(`Cannot update topic ${topic}: ${err.message}`); }); } async function unsubscribe(topic: string, connectionId: string) { return dynamoDb .updateItem({ TableName: TOPICS_TABLE, Key: { name: { S: topic } }, UpdateExpression: 'DELETE receivers :r', ExpressionAttributeValues: { ':r': { SS: [connectionId] }, }, }) .promise() .catch(err => { console.log(`Cannot update topic ${topic}: ${err.message}`); }); } async function getReceivers(topic: string) { try { const { Item: item } = await dynamoDb .getItem({ TableName: TOPICS_TABLE, Key: { name: { S: topic } }, }) .promise(); return item?.receivers ? item.receivers.SS : []; } catch (err) { console.log(`Cannot get topic ${topic}: ${err.message}`); return []; } } async function handleYWebRtcMessage( connectionId: string, message: | YWebRtcSubscriptionMessage | YWebRtcPublishMessage | YWebRtcPingMessage, send: (receiver: string, message: any) => Promise<void>, ) { const promises = []; if (message && message.type) { switch (message.type) { case 'subscribe': (message.topics || []).forEach(topic => { promises.push(subscribe(topic, connectionId)); }); break; case 'unsubscribe': (message.topics || []).forEach(topic => { promises.push(unsubscribe(topic, connectionId)); }); break; case 'publish': if (message.topic) { const receivers = await getReceivers(message.topic); receivers.forEach(receiver => { promises.push(send(receiver, message)); }); } break; case 'ping': promises.push(send(connectionId, { type: 'pong' })); break; } } await Promise.all(promises); } function handleConnect(connectionId: string) { // Nothing to do console.log(`Connected: ${connectionId}`); } async function handleDisconnect(connectionId: string) { console.log(`Disconnected: ${connectionId}`); // Remove the connection from all topics // This is quite expensive, as we need to go through all topics in the table const promises = []; for await (const item of scanItems(dynamoDb, TOPICS_TABLE)) { const receivers = item.receivers?.SS ?? []; if (receivers.includes(connectionId)) { promises.push(unsubscribe(item.name.S, connectionId)); } } await Promise.all(promises); } export async function handler( event: HttpV2WebsocketEvent, ): Promise<HttpV2Response> { if (!TOPICS_TABLE) { return { statusCode: 502, body: 'Not configured' }; } // The AWS "simple chat" example uses event.requestContext.domainName/...stage, but that doesn't work with custom domain // names. It also doesn't matter, this is anyways an internal (AWS->AWS) call. const apigwManagementApi = new ApiGatewayManagementApi({ apiVersion: '2018-11-29', endpoint: `https://${event.requestContext.apiId}.execute-api.${AWS_REGION}.amazonaws.com/${event.requestContext.stage}`, }); const send = async (connectionId: string, message: any) => { try { await apigwManagementApi .postToConnection({ ConnectionId: connectionId, Data: JSON.stringify(message), }) .promise(); } catch (err) { if (err.statusCode === 410) { console.log(`Found stale connection, deleting ${connectionId}`); await handleDisconnect(connectionId); } else { // Log, but otherwise ignore: There's not much we can do, really. console.log(`Error when sending to ${connectionId}: ${err.message}`); } } }; try { switch (event.requestContext.routeKey) { case '$connect': handleConnect(event.requestContext.connectionId); break; case '$disconnect': await handleDisconnect(event.requestContext.connectionId); break; case '$default': await handleYWebRtcMessage( event.requestContext.connectionId, JSON.parse(event.body), send, ); break; } return { statusCode: 200 }; } catch (err) { console.log(`Error ${event.requestContext.connectionId}`, err); return { statusCode: 500, body: err.message }; } }