Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Hub Mode for inspector #2881

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions sanic/config.py
Expand Up @@ -50,6 +50,9 @@
"INSPECTOR_TLS_KEY": _default,
"INSPECTOR_TLS_CERT": _default,
"INSPECTOR_API_KEY": "",
"INSPECTOR_HUB_MODE": _default,
"INSPECTOR_HUB_HOST": "",
"INSPECTOR_HUB_PORT": 0,
"KEEP_ALIVE_TIMEOUT": 120,
"KEEP_ALIVE": True,
"LOCAL_CERT_CREATOR": LocalCertCreator.AUTO,
Expand Down Expand Up @@ -108,6 +111,9 @@ class Config(dict, metaclass=DescriptorMeta):
INSPECTOR_TLS_KEY: Union[Path, str, Default]
INSPECTOR_TLS_CERT: Union[Path, str, Default]
INSPECTOR_API_KEY: str
INSPECTOR_HUB_MODE: Union[bool, Default]
INSPECTOR_HUB_HOST: str
INSPECTOR_HUB_PORT: int
KEEP_ALIVE_TIMEOUT: int
KEEP_ALIVE: bool
LOCAL_CERT_CREATOR: Union[str, LocalCertCreator]
Expand Down
6 changes: 5 additions & 1 deletion sanic/mixins/startup.py
Expand Up @@ -1134,8 +1134,12 @@ def serve(
primary.config.INSPECTOR_API_KEY,
primary.config.INSPECTOR_TLS_KEY,
primary.config.INSPECTOR_TLS_CERT,
primary.config.INSPECTOR_HUB_MODE,
primary.config.INSPECTOR_HUB_HOST,
primary.config.INSPECTOR_HUB_PORT,
)
manager.manage("Inspector", inspector, {}, transient=False)
# TODO: Change back to false
manager.manage("Inspector", inspector, {}, transient=True)

primary._inspector = inspector
primary._manager = manager
Expand Down
113 changes: 111 additions & 2 deletions sanic/worker/inspector.py
@@ -1,17 +1,64 @@
from __future__ import annotations

from asyncio import sleep
from dataclasses import dataclass
from datetime import datetime
from inspect import isawaitable
from multiprocessing.connection import Connection
from os import environ
from pathlib import Path
from typing import Any, Dict, Mapping, Union
from typing import TYPE_CHECKING, Any, Dict, Mapping, Tuple, Union

from websockets import ConnectionClosed, connect, connection

from sanic.exceptions import Unauthorized
from sanic.helpers import Default
from sanic.helpers import Default, _default
from sanic.log import logger
from sanic.request import Request
from sanic.response import json
from sanic.server.websockets.impl import WebsocketImplProtocol


if TYPE_CHECKING:
from sanic import Sanic


@dataclass
class NodeState:
...


@dataclass
class HubState:
nodes: Dict[str, NodeState]


class NodeClient:
def __init__(self, hub_host: str, hub_port: int) -> None:
self.hub_host = hub_host
self.hub_port = hub_port

async def run(self, state_getter) -> None:
try:
async for ws in connect(
f"ws://{self.hub_host}:{self.hub_port}/hub"
):
try:
await self._run_node(ws, state_getter)
except ConnectionClosed:
continue
except BaseException:
...
finally:
print("Node out")

def _setup_ws_client(self, hub_host: str, hub_port: int) -> connection:
return connect(f"ws://{hub_host}:{hub_port}/hub")

async def _run_node(self, ws: connection, state_getter) -> None:
while True:
await ws.send(str(state_getter()))
await sleep(3)


class Inspector:
Expand Down Expand Up @@ -46,7 +93,13 @@ def __init__(
api_key: str,
tls_key: Union[Path, str, Default],
tls_cert: Union[Path, str, Default],
hub_mode: Union[bool, Default] = _default,
hub_host: str = "",
hub_port: int = 0,
):
hub_mode, node_mode = self._detect_modes(
hub_mode, host, port, hub_host, hub_port
)
self._publisher = publisher
self.app_info = app_info
self.worker_state = worker_state
Expand All @@ -55,6 +108,10 @@ def __init__(
self.api_key = api_key
self.tls_key = tls_key
self.tls_cert = tls_cert
self.hub_mode = hub_mode
self.node_mode = node_mode
self.hub_host = hub_host
self.hub_port = hub_port

def __call__(self, run=True, **_) -> Inspector:
from sanic import Sanic
Expand All @@ -70,14 +127,45 @@ def __call__(self, run=True, **_) -> Inspector:
if not isinstance(self.tls_key, Default)
and not isinstance(self.tls_cert, Default)
else None,
debug=True,
)
return self

def _detect_modes(
self,
hub_mode: Union[bool, Default],
host: str,
port: int,
hub_host: str,
hub_port: int,
) -> Tuple[bool, bool]:
print(hub_mode, host, port, hub_host, hub_port)
if hub_host == host and hub_port == port:
if not hub_mode:
raise ValueError(
"Hub mode must be enabled when using the same "
"host and port for the hub and the inspector"
)
hub_mode = True
if (hub_host and not hub_port) or (hub_port and not hub_host):
raise ValueError("Both hub host and hub port must be specified")
if hub_mode is True:
return True, False
elif hub_host and hub_port:
return False, True
else:
return False, False

def _setup(self):
self.app.get("/")(self._info)
self.app.post("/<action:str>")(self._action)
if self.api_key:
self.app.on_request(self._authentication)
if self.hub_mode:
self.app.before_server_start(self._setup_hub)
self.app.websocket("/hub")(self._hub)
if self.node_mode:
self.app.before_server_start(self._run_node)
environ["SANIC_IGNORE_PRODUCTION_WARNING"] = "true"

def _authentication(self, request: Request) -> None:
Expand Down Expand Up @@ -154,3 +242,24 @@ def shutdown(self) -> None:
"""Shutdown the workers"""
message = "__TERMINATE__"
self._publisher.send(message)

def _setup_hub(self, app: Sanic) -> None:
app.ctx.hub_state = HubState(nodes={})

@staticmethod
async def _hub(
request: Request,
websocket: WebsocketImplProtocol,
) -> None:
hub_state = request.app.ctx.hub_state
hub_state.nodes[request.id] = NodeState()
while True:
message = await websocket.recv()
if message == "ping":
await websocket.send("pong")
else:
logger.info("Hub received message: %s", message)

async def _run_node(self, app: Sanic) -> None:
client = NodeClient(self.hub_host, self.hub_port)
app.add_task(client.run(self._state_to_json))