Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket support #193

Open
Kane610 opened this issue Mar 16, 2023 · 1 comment
Open

Websocket support #193

Kane610 opened this issue Mar 16, 2023 · 1 comment

Comments

@Kane610
Copy link
Owner

Kane610 commented Mar 16, 2023

https://github.com/dmannion1972/axis-websocket-metadata-python/blob/main/getsesssionid.py

https://github.com/dmannion1972/axis-websocket-metadata-python

encode/httpx#304
https://gist.github.com/tomchristie/3293d5b118b5646ce79cc074976744b0

import trio

async def main():
    async with ws_connect("ws://127.0.0.1:8765") as websockets:
        await websockets.send("Hello, world.")
        message = await websockets.recv()
        print(message)

trio.run(main)
import base64
import contextlib
import os

import httpx
import wsproto


class ConnectionClosed(Exception):
    pass


class WebsocketConnection:
    def __init__(self, network_steam):
        self._ws_connection_state = wsproto.Connection(wsproto.ConnectionType.CLIENT)
        self._network_stream = network_steam
        self._events = []

    async def send(self, text):
        """
        Send a text frame over the websocket connection.
        """
        event = wsproto.events.TextMessage(text)
        data = self._ws_connection_state.send(event)
        await self._network_stream.write(data)

    async def recv(self):
        """
        Receive the next text frame from the websocket connection.
        """
        while not self._events:
            data = await self._network_stream.read(max_bytes=4096)
            self._ws_connection_state.receive_data(data)
            self._events = list(self._ws_connection_state.events())

        event = self._events.pop(0)
        if isinstance(event, wsproto.events.TextMessage):
            return event.data
        elif isinstance(event, wsproto.events.CloseConnection):
            raise ConnectionClosed()


@contextlib.asynccontextmanager
async def ws_connect(url):
    headers = {
        "connection": "upgrade",
        "upgrade": "websocket",
        "sec-websocket-key": base64.b64encode(os.urandom(16)),
        "sec-websocket-version": "13",
    }

    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url, headers=headers) as response:
            network_steam = response.extensions["network_stream"]
            yield WebsocketConnection(network_steam)
@Kane610
Copy link
Owner Author

Kane610 commented Nov 2, 2023

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant