Last active
February 21, 2025 08:41
-
-
Save timhughes/313c89a0d587a25506e204573c8017e4 to your computer and use it in GitHub Desktop.
Revisions
-
timhughes revised this gist
Apr 20, 2020 . 1 changed file with 22 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,25 @@ """ Usage: Make sure that redis is running on localhost (or adjust the url) Install uvicorn or some other asgi server https://asgi.readthedocs.io/en/latest/implementations.html pip install -u uvicorn Install dependencies pip install -u aioredis fastapi Start the application, this will depend on the asgi server uvicorn fastapi_websocket_redis_pubsub:app Open two browser windows to the web interface http://127.0.0.1:8000 Enter some data in one window and it should appear in the other window. """ import asyncio import logging -
timhughes revised this gist
Mar 18, 2020 . 1 changed file with 4 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -60,8 +60,9 @@ async def websocket_endpoint(websocket: WebSocket): await redis_connector(websocket) async def redis_connector( websocket: WebSocket, redis_uri: str = "redis://localhost:6379" ): async def consumer_handler(ws: WebSocket, r): try: while True: @@ -84,7 +85,7 @@ async def producer_handler(r, ws: WebSocket): # TODO this needs handling better logger.error(exc) redis = await aioredis.create_redis_pool(redis_uri) consumer_task = consumer_handler(websocket, redis) producer_task = producer_handler(redis, websocket) -
timhughes revised this gist
Mar 18, 2020 . 1 changed file with 41 additions and 26 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -6,7 +6,7 @@ from fastapi import FastAPI from fastapi.responses import HTMLResponse from fastapi.websockets import WebSocket, WebSocketDisconnect from aioredis logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -57,28 +57,43 @@ async def get(): @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() await redis_connector(websocket) async def redis_connector(websocket: WebSocket): async def consumer_handler(ws: WebSocket, r): try: while True: message = await ws.receive_text() if message: await r.publish("chat:c", message) except WebSocketDisconnect as exc: # TODO this needs handling better logger.error(exc) async def producer_handler(r, ws: WebSocket): (channel,) = await r.subscribe("chat:c") assert isinstance(channel, aioredis.Channel) try: while True: message = await channel.get() if message: await ws.send_text(message.decode("utf-8")) except Exception as exc: # TODO this needs handling better logger.error(exc) redis = await aioredis.create_redis_pool("redis://localhost:6379") consumer_task = consumer_handler(websocket, redis) producer_task = producer_handler(redis, websocket) done, pending = await asyncio.wait( [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED, ) logger.debug(f"Done task: {done}") for task in pending: logger.debug(f"Canceling task: {task}") task.cancel() redis.close() await redis.wait_closed() -
timhughes renamed this gist
Mar 18, 2020 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
timhughes created this gist
Mar 18, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,84 @@ import asyncio import logging from fastapi import FastAPI from fastapi.responses import HTMLResponse from fastapi.websockets import WebSocket, WebSocketDisconnect from aioredis import create_connection, Channel logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() html = """ <!DOCTYPE html> <html> <head> <title>Chat</title> </head> <body> <h1>WebSocket Chat</h1> <form action="" onsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> var ws = new WebSocket("ws://localhost:8000/ws"); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ @app.get("/") async def get(): return HTMLResponse(html) @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() await asyncio.gather(handle_websocket(websocket), handle_redis(websocket)) async def handle_redis(websocket): redis_conn = await create_connection(("localhost", 6379)) channel = Channel("chat", is_pattern=False) await redis_conn.execute_pubsub("subscribe", channel) try: while True: message = await channel.get() if message: await websocket.send_text(message.decode("utf-8")) except Exception as exc: logger.error(exc) async def handle_websocket(websocket): redis_conn = await create_connection(("localhost", 6379)) try: while True: message = await websocket.receive_text() if message: await redis_conn.execute("publish", "chat", message) except WebSocketDisconnect as exc: logger.error(exc)