Skip to content

Instantly share code, notes, and snippets.

@timhughes
Last active February 21, 2025 08:41
Show Gist options
  • Save timhughes/313c89a0d587a25506e204573c8017e4 to your computer and use it in GitHub Desktop.
Save timhughes/313c89a0d587a25506e204573c8017e4 to your computer and use it in GitHub Desktop.

Revisions

  1. timhughes revised this gist Apr 20, 2020. 1 changed file with 22 additions and 0 deletions.
    22 changes: 22 additions & 0 deletions fastapi_websocket_redis_pubsub.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,25 @@
    """
    Usage:
    Make sure that redis is running on localhost (or adjust the url)
    Install uvicorn or some other asgi server https://asgi.readthedocs.io/en/latest/implementations.html
    pip install -u uvicorn
    Install dependencies
    pip install -u aioredis fastapi
    Start the application, this will depend on the asgi server
    uvicorn fastapi_websocket_redis_pubsub:app
    Open two browser windows to the web interface http://127.0.0.1:8000
    Enter some data in one window and it should appear in the other window.
    """

    import asyncio

    import logging
  2. timhughes revised this gist Mar 18, 2020. 1 changed file with 4 additions and 3 deletions.
    7 changes: 4 additions & 3 deletions fastapi_websocket_redis_pubsub.py
    Original file line number Diff line number Diff line change
    @@ -60,8 +60,9 @@ async def websocket_endpoint(websocket: WebSocket):
    await redis_connector(websocket)


    async def redis_connector(websocket: WebSocket):

    async def redis_connector(
    websocket: WebSocket, redis_uri: str = "redis://localhost:6379"
    ):
    async def consumer_handler(ws: WebSocket, r):
    try:
    while True:
    @@ -84,7 +85,7 @@ async def producer_handler(r, ws: WebSocket):
    # TODO this needs handling better
    logger.error(exc)

    redis = await aioredis.create_redis_pool("redis://localhost:6379")
    redis = await aioredis.create_redis_pool(redis_uri)

    consumer_task = consumer_handler(websocket, redis)
    producer_task = producer_handler(redis, websocket)
  3. timhughes revised this gist Mar 18, 2020. 1 changed file with 41 additions and 26 deletions.
    67 changes: 41 additions & 26 deletions fastapi_websocket_redis_pubsub.py
    Original file line number Diff line number Diff line change
    @@ -6,7 +6,7 @@
    from fastapi import FastAPI
    from fastapi.responses import HTMLResponse
    from fastapi.websockets import WebSocket, WebSocketDisconnect
    from aioredis import create_connection, Channel
    from aioredis

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    @@ -57,28 +57,43 @@ async def get():
    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    await asyncio.gather(handle_websocket(websocket), handle_redis(websocket))


    async def handle_redis(websocket):
    redis_conn = await create_connection(("localhost", 6379))
    channel = Channel("chat", is_pattern=False)
    await redis_conn.execute_pubsub("subscribe", channel)
    try:
    while True:
    message = await channel.get()
    if message:
    await websocket.send_text(message.decode("utf-8"))
    except Exception as exc:
    logger.error(exc)


    async def handle_websocket(websocket):
    redis_conn = await create_connection(("localhost", 6379))
    try:
    while True:
    message = await websocket.receive_text()
    if message:
    await redis_conn.execute("publish", "chat", message)
    except WebSocketDisconnect as exc:
    logger.error(exc)
    await redis_connector(websocket)


    async def redis_connector(websocket: WebSocket):

    async def consumer_handler(ws: WebSocket, r):
    try:
    while True:
    message = await ws.receive_text()
    if message:
    await r.publish("chat:c", message)
    except WebSocketDisconnect as exc:
    # TODO this needs handling better
    logger.error(exc)

    async def producer_handler(r, ws: WebSocket):
    (channel,) = await r.subscribe("chat:c")
    assert isinstance(channel, aioredis.Channel)
    try:
    while True:
    message = await channel.get()
    if message:
    await ws.send_text(message.decode("utf-8"))
    except Exception as exc:
    # TODO this needs handling better
    logger.error(exc)

    redis = await aioredis.create_redis_pool("redis://localhost:6379")

    consumer_task = consumer_handler(websocket, redis)
    producer_task = producer_handler(redis, websocket)
    done, pending = await asyncio.wait(
    [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED,
    )
    logger.debug(f"Done task: {done}")
    for task in pending:
    logger.debug(f"Canceling task: {task}")
    task.cancel()
    redis.close()
    await redis.wait_closed()
  4. timhughes renamed this gist Mar 18, 2020. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  5. timhughes created this gist Mar 18, 2020.
    84 changes: 84 additions & 0 deletions main.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,84 @@
    import asyncio

    import logging


    from fastapi import FastAPI
    from fastapi.responses import HTMLResponse
    from fastapi.websockets import WebSocket, WebSocketDisconnect
    from aioredis import create_connection, Channel

    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)


    app = FastAPI()

    html = """
    <!DOCTYPE html>
    <html>
    <head>
    <title>Chat</title>
    </head>
    <body>
    <h1>WebSocket Chat</h1>
    <form action="" onsubmit="sendMessage(event)">
    <input type="text" id="messageText" autocomplete="off"/>
    <button>Send</button>
    </form>
    <ul id='messages'>
    </ul>
    <script>
    var ws = new WebSocket("ws://localhost:8000/ws");
    ws.onmessage = function(event) {
    var messages = document.getElementById('messages')
    var message = document.createElement('li')
    var content = document.createTextNode(event.data)
    message.appendChild(content)
    messages.appendChild(message)
    };
    function sendMessage(event) {
    var input = document.getElementById("messageText")
    ws.send(input.value)
    input.value = ''
    event.preventDefault()
    }
    </script>
    </body>
    </html>
    """


    @app.get("/")
    async def get():
    return HTMLResponse(html)


    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    await asyncio.gather(handle_websocket(websocket), handle_redis(websocket))


    async def handle_redis(websocket):
    redis_conn = await create_connection(("localhost", 6379))
    channel = Channel("chat", is_pattern=False)
    await redis_conn.execute_pubsub("subscribe", channel)
    try:
    while True:
    message = await channel.get()
    if message:
    await websocket.send_text(message.decode("utf-8"))
    except Exception as exc:
    logger.error(exc)


    async def handle_websocket(websocket):
    redis_conn = await create_connection(("localhost", 6379))
    try:
    while True:
    message = await websocket.receive_text()
    if message:
    await redis_conn.execute("publish", "chat", message)
    except WebSocketDisconnect as exc:
    logger.error(exc)