Skip to content

Starlette way to notify user via WebSocket #1191

Answered by nodarai
nodarai asked this question in Q&A
Discussion options

You must be logged in to vote

Found a solution in FastAPI documentation which uses Starlette's WebSokets so the solution should be same for Starlette.

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await c…

Replies: 1 comment

Comment options

You must be logged in to vote
0 replies
Answer selected by nodarai
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
1 participant