Skip to content

Commit

Permalink
refactor: gRPC server lifecycle (#2801)
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Pham <29749331+aarnphm@users.noreply.github.com>
  • Loading branch information
aarnphm committed Jul 24, 2022
1 parent 56d5007 commit 19f9e54
Show file tree
Hide file tree
Showing 16 changed files with 537 additions and 149 deletions.
1 change: 1 addition & 0 deletions bentoml/_internal/bento/build_dev_bentoml_whl.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def build_bentoml_editable_wheel(target_path: str) -> None:
)
try:
import build.env

from build import ProjectBuilder
except ModuleNotFoundError:
raise BentoMLException(
Expand Down
124 changes: 53 additions & 71 deletions bentoml/_internal/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@

logger = logging.getLogger(__name__)


SCRIPT_RUNNER = "bentoml._internal.server.cli.runner"
SCRIPT_API_SERVER = "bentoml._internal.server.cli.api_server"
SCRIPT_DEV_API_SERVER = "bentoml._internal.server.cli.dev_api_server"
SCRIPT_GRPC_DEV_API_SERVER = "bentoml._internal.server.cli.grpc_dev_api_server"
SCRIPT_API_SERVER = "bentoml._internal.server.cli.http.api_server"
SCRIPT_DEV_API_SERVER = "bentoml._internal.server.cli.http.dev_api_server"
SCRIPT_GRPC_DEV_API_SERVER = "bentoml._internal.server.cli.grpc.dev_api_server"

MAX_AF_UNIX_PATH_LENGTH = 103

API_SERVER_NAME = "_bento_api_server"


@inject
Expand Down Expand Up @@ -85,76 +90,57 @@ def serve_development(
backlog: int = Provide[BentoMLContainer.api_server_config.backlog],
bentoml_home: str = Provide[BentoMLContainer.bentoml_home],
reload: bool = False,
using_grpc: bool = False,
grpc: bool = False,
) -> None:
working_dir = os.path.realpath(os.path.expanduser(working_dir))
svc = load(bento_identifier, working_dir=working_dir) # verify service loading

from circus.sockets import CircusSocket # type: ignore
from circus.watcher import Watcher # type: ignore
from circus.sockets import CircusSocket
from circus.watcher import Watcher

prometheus_dir = ensure_prometheus_dir()

watchers: t.List[Watcher] = []
watchers: list[Watcher] = []

if grpc:
watcher_name = "grpc_dev_api_server"
script_to_use = SCRIPT_GRPC_DEV_API_SERVER
socket_path = f"tcp://{host}:{port}"
else:
watcher_name = "dev_api_server"
script_to_use = SCRIPT_DEV_API_SERVER
socket_path = f"fd://$(circus.sockets.{API_SERVER_NAME})"

circus_sockets: t.List[CircusSocket] = []
circus_sockets: list[CircusSocket] = []
circus_sockets.append(
CircusSocket(
name="_bento_api_server",
host=host,
port=port,
backlog=backlog,
)
CircusSocket(name=API_SERVER_NAME, host=host, port=port, backlog=backlog)
)

if using_grpc:
watchers.append(
Watcher(
name="grpc_dev_api_server",
cmd=sys.executable,
args=[
"-m",
SCRIPT_GRPC_DEV_API_SERVER,
bento_identifier,
"--bind",
"fd://$(circus.sockets._bento_api_server)",
"--working-dir",
working_dir,
"--port",
str(port),
],
copy_env=True,
stop_children=True,
use_sockets=True,
working_dir=working_dir,
)
)
else:
watchers.append(
Watcher(
name="dev_api_server",
cmd=sys.executable,
args=[
"-m",
SCRIPT_DEV_API_SERVER,
bento_identifier,
"--bind",
"fd://$(circus.sockets._bento_api_server)",
"--working-dir",
working_dir,
"--prometheus-dir",
prometheus_dir,
],
copy_env=True,
stop_children=True,
use_sockets=True,
working_dir=working_dir,
)
watchers.append(
Watcher(
name=watcher_name,
cmd=sys.executable,
args=[
"-m",
script_to_use,
bento_identifier,
"--bind",
socket_path,
"--working-dir",
working_dir,
"--prometheus-dir",
prometheus_dir,
],
copy_env=True,
stop_children=True,
use_sockets=True,
working_dir=working_dir,
)
)

plugins = []
if reload:
if sys.platform == "win32":
if psutil.WINDOWS:
logger.warning(
"Due to circus limitations, output from the reloader plugin will not be shown on Windows."
)
Expand All @@ -175,7 +161,7 @@ def serve_development(
watchers,
sockets=circus_sockets,
plugins=plugins,
debug=True if sys.platform != "win32" else False,
debug=not psutil.WINDOWS,
loggerconfig=SERVER_LOGGING_CONFIG,
loglevel="WARNING",
)
Expand All @@ -189,9 +175,6 @@ def serve_development(
)


MAX_AF_UNIX_PATH_LENGTH = 103


@inject
def serve_production(
bento_identifier: str,
Expand All @@ -204,8 +187,8 @@ def serve_production(
working_dir = os.path.realpath(os.path.expanduser(working_dir))
svc = load(bento_identifier, working_dir=working_dir, standalone_load=True)

from circus.sockets import CircusSocket # type: ignore
from circus.watcher import Watcher # type: ignore
from circus.sockets import CircusSocket
from circus.watcher import Watcher

watchers: t.List[Watcher] = []
circus_socket_map: t.Dict[str, CircusSocket] = {}
Expand Down Expand Up @@ -284,7 +267,7 @@ def serve_production(
working_dir,
"--no-access-log",
"--worker-id",
"$(circus.wid)",
"$(CIRCUS.WID)",
],
copy_env=True,
stop_children=True,
Expand All @@ -293,16 +276,15 @@ def serve_production(
numprocesses=runner.scheduled_worker_count,
)
)
port_stack.enter_context(
reserve_free_port()
) # reserve one more to avoid conflicts
# reserve one more to avoid conflicts
port_stack.enter_context(reserve_free_port())
else:
raise NotImplementedError("Unsupported platform: {}".format(sys.platform))

logger.debug("Runner map: %s", runner_bind_map)

circus_socket_map["_bento_api_server"] = CircusSocket(
name="_bento_api_server",
circus_socket_map[API_SERVER_NAME] = CircusSocket(
name=API_SERVER_NAME,
host=host,
port=port,
backlog=backlog,
Expand All @@ -316,7 +298,7 @@ def serve_production(
SCRIPT_API_SERVER,
bento_identifier,
"--bind",
"fd://$(circus.sockets._bento_api_server)",
f"fd://$(circus.sockets.{API_SERVER_NAME})",
"--runner-map",
json.dumps(runner_bind_map),
"--working-dir",
Expand Down
27 changes: 5 additions & 22 deletions bentoml/_internal/server/cli/grpc/dev_api_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import typing as t
from typing import TYPE_CHECKING
import asyncio
from urllib.parse import urlparse

import click
import psutil
Expand All @@ -11,9 +11,6 @@
from bentoml._internal.context import component_context
from bentoml._internal.configuration.containers import BentoMLContainer

if TYPE_CHECKING:
from ...grpc_server import GRPCServer


@click.command()
@click.argument("bento_identifier", type=click.STRING, required=False, default=".")
Expand All @@ -28,9 +25,8 @@ def main(
bento_identifier: str,
bind: str,
working_dir: str | None,
prometheus_dir: t.Optional[str],
prometheus_dir: str | None,
):
import asyncio

component_context.component_name = "grpc_dev_api_server"

Expand All @@ -52,22 +48,9 @@ def main(
if psutil.WINDOWS:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type: ignore

cleanup: list[t.Coroutine[t.Any, t.Any, None]] = []

loop = asyncio.get_event_loop()

async def start_grpc_server(srv: GRPCServer) -> None:
srv.add_insecure_port(bind)

await srv.start()
cleanup.append(srv.stop())
await srv.wait_for_termination()
parsed = urlparse(bind)

try:
loop.run_until_complete(start_grpc_server(svc.grpc_server))
finally:
loop.run_until_complete(*cleanup)
loop.close()
svc.grpc_server.run(bind_addr=f"[::]:{parsed.port}")


if __name__ == "__main__":
Expand Down
5 changes: 2 additions & 3 deletions bentoml/_internal/server/cli/http/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def main(
watcher = Watcher(
name="bento_api_server",
cmd=sys.executable,
args=["-m", "bentoml._internal.server.cli.api_server"]
args=["-m", "bentoml._internal.server.cli.http.api_server"]
+ unparse_click_params(params, ctx.command.params, factory=str),
copy_env=True,
numprocesses=1,
Expand Down Expand Up @@ -130,13 +130,12 @@ def main(

asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type: ignore

app = svc.asgi_app
assert parsed.scheme == "fd"

# skip the uvicorn internal supervisor
fd = int(parsed.netloc)
sock = socket.socket(fileno=fd)
config = uvicorn.Config(app, **uvicorn_options)
config = uvicorn.Config(svc.asgi_app, **uvicorn_options)
uvicorn.Server(config).run(sockets=[sock])


Expand Down
5 changes: 2 additions & 3 deletions bentoml/_internal/server/cli/http/dev_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import click
import psutil
import uvicorn

from bentoml import load
from bentoml._internal.log import configure_server_logging
Expand Down Expand Up @@ -63,13 +64,11 @@ def main(

asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type: ignore

import uvicorn # type: ignore

config = uvicorn.Config(svc.asgi_app, **uvicorn_options)
uvicorn.Server(config).run(sockets=[sock])
else:
raise ValueError(f"Unsupported bind scheme: {bind}")


if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter
main()
5 changes: 5 additions & 0 deletions bentoml/_internal/server/grpc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .server import GRPCServer
from .servicer import register_bento_servicer
from .servicer import register_health_servicer

__all__ = ["GRPCServer", "register_health_servicer", "register_bento_servicer"]
Empty file.

0 comments on commit 19f9e54

Please sign in to comment.