Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save malefs/f10f816ad55b4a349f3b7de92d080977 to your computer and use it in GitHub Desktop.

Select an option

Save malefs/f10f816ad55b4a349f3b7de92d080977 to your computer and use it in GitHub Desktop.
FastAPI Websocket Bidirectional Redis PubSub
import asyncio
import logging
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastapi.websockets import WebSocket, WebSocketDisconnect
from aioredis import create_connection, Channel
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
await asyncio.gather(handle_websocket(websocket), handle_redis(websocket))
async def handle_redis(websocket):
redis_conn = await create_connection(("localhost", 6379))
channel = Channel("chat", is_pattern=False)
await redis_conn.execute_pubsub("subscribe", channel)
try:
while True:
message = await channel.get()
if message:
await websocket.send_text(message.decode("utf-8"))
except Exception as exc:
logger.error(exc)
async def handle_websocket(websocket):
redis_conn = await create_connection(("localhost", 6379))
try:
while True:
message = await websocket.receive_text()
if message:
await redis_conn.execute("publish", "chat", message)
except WebSocketDisconnect as exc:
logger.error(exc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment