Skip to content

Commit

Permalink
feat(grpc): --production (#2865)
Browse files Browse the repository at this point in the history
Co-authored-by: bojiang <5886138+bojiang@users.noreply.github.com>
  • Loading branch information
aarnphm and bojiang committed Aug 5, 2022
1 parent 258f5ec commit 8f2a36e
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 38 deletions.
14 changes: 10 additions & 4 deletions bentoml/_internal/cli/bento_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ def add_serve_command(cli: click.Group) -> None:
help="The maximum number of pending connections.",
show_default=True,
)
@click.option(
"--max-concurrent-streams",
type=click.INT,
default=BentoMLContainer.grpc.max_concurrent_streams.get(),
help="Maximum number of concurrent incoming streams to allow on a HTTP/2 connection.",
show_default=True,
)
@click.option(
"--reload",
type=click.BOOL,
Expand Down Expand Up @@ -82,6 +89,7 @@ def serve(
host: t.Optional[str],
api_workers: t.Optional[int],
backlog: int,
max_concurrent_streams: int,
reload: bool,
working_dir: str,
grpc: bool,
Expand Down Expand Up @@ -127,10 +135,6 @@ def serve(
logger.warning(
"'--reload' is not supported with '--production'; ignoring"
)
if grpc:
logger.warning(
"'--grpc' is not supported with '--production' yet; ignoring"
)

from ..server import serve_production

Expand All @@ -140,7 +144,9 @@ def serve(
port=port,
host=BentoMLContainer.service_host.get() if host is None else host,
backlog=backlog,
max_concurrent_streams=max_concurrent_streams,
api_workers=api_workers,
grpc=grpc,
)
else:
from ..server import serve_development
Expand Down
1 change: 1 addition & 0 deletions bentoml/_internal/configuration/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def _is_ip_address(addr: str) -> bool:
"access_control_expose_headers": Or([str], str, None),
},
"grpc": {
"max_concurrent_streams": Or(int, None),
"max_message_length": Or(int, None),
"maximum_concurrent_rpcs": Or(int, None),
},
Expand Down
6 changes: 4 additions & 2 deletions bentoml/_internal/configuration/default_configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ api_server:
access_control_max_age: Null
access_control_expose_headers: Null
grpc:
max_message_length: Null
maximum_concurrent_rpcs: Null
# https://github.com/grpc/grpc/blob/ebfb028a29ef5f0f4be6f33c131c740ae0a32c84/src/core/ext/transport/chttp2/transport/http2_settings.cc#L51
max_concurrent_streams: ~
max_message_length: ~
maximum_concurrent_rpcs: ~

runners:
batching:
Expand Down
43 changes: 32 additions & 11 deletions bentoml/_internal/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from simple_di import Provide

from bentoml import load
from bentoml.exceptions import UnprocessableEntity

from ..log import SERVER_LOGGING_CONFIG
from ..utils import reserve_free_port
Expand All @@ -30,6 +31,7 @@

SCRIPT_RUNNER = "bentoml._internal.server.cli.runner"
SCRIPT_API_SERVER = "bentoml._internal.server.cli.http_api_server"
SCRIPT_GRPC_API_SERVER = "bentoml._internal.server.cli.grpc_api_server"
SCRIPT_DEV_API_SERVER = "bentoml._internal.server.cli.http_dev_api_server"
SCRIPT_GRPC_DEV_API_SERVER = "bentoml._internal.server.cli.grpc_dev_api_server"

Expand Down Expand Up @@ -105,11 +107,11 @@ def serve_development(
if grpc:
watcher_name = "grpc_dev_api_server"
script_to_use = SCRIPT_GRPC_DEV_API_SERVER
socket_path = f"tcp://{host}:{port}"
bind_address = f"tcp://{host}:{port}"
else:
watcher_name = "dev_api_server"
script_to_use = SCRIPT_DEV_API_SERVER
socket_path = f"fd://$(circus.sockets.{API_SERVER_NAME})"
bind_address = f"fd://$(circus.sockets.{API_SERVER_NAME})"

circus_sockets: list[CircusSocket] = []
circus_sockets.append(
Expand All @@ -125,7 +127,7 @@ def serve_development(
script_to_use,
bento_identifier,
"--bind",
socket_path,
bind_address,
"--working-dir",
working_dir,
"--prometheus-dir",
Expand Down Expand Up @@ -182,21 +184,28 @@ def serve_production(
port: int = Provide[BentoMLContainer.api_server_config.port],
host: str = Provide[BentoMLContainer.api_server_config.host],
backlog: int = Provide[BentoMLContainer.api_server_config.backlog],
max_concurrent_streams: int = Provide[BentoMLContainer.grpc.max_concurrent_streams],
api_workers: t.Optional[int] = None,
grpc: bool = False,
) -> None:
working_dir = os.path.realpath(os.path.expanduser(working_dir))
svc = load(bento_identifier, working_dir=working_dir, standalone_load=True)

from circus.sockets import CircusSocket
from circus.watcher import Watcher

watchers: t.List[Watcher] = []
circus_socket_map: t.Dict[str, CircusSocket] = {}
runner_bind_map: t.Dict[str, str] = {}
watchers: list[Watcher] = []
circus_socket_map: dict[str, CircusSocket] = {}
runner_bind_map: dict[str, str] = {}
uds_path = None

prometheus_dir = ensure_prometheus_dir()

if grpc and psutil.WINDOWS:
raise UnprocessableEntity(
"'grpc' is not supported on Windows with '--production'. The reason being SO_REUSEPORT socket option is only available on UNIX system, and gRPC implementation depends on this behaviour."
)

if psutil.POSIX:
# use AF_UNIX sockets for Circus
uds_path = tempfile.mkdtemp()
Expand Down Expand Up @@ -283,28 +292,40 @@ def serve_production(

logger.debug("Runner map: %s", runner_bind_map)

if grpc:
watcher_name = "grpc_api_server"
script_to_use = SCRIPT_GRPC_API_SERVER
socket_path = f"tcp://{host}:{port}"
# num_connect_args = ["--max-concurrent-streams", f"{max_concurrent_streams}"]
num_connect_args = []
else:
watcher_name = "api_server"
script_to_use = SCRIPT_API_SERVER
socket_path = f"fd://$(circus.sockets.{API_SERVER_NAME})"
num_connect_args = ["--backlog", f"{backlog}"]

circus_socket_map[API_SERVER_NAME] = CircusSocket(
name=API_SERVER_NAME,
host=host,
port=port,
backlog=backlog,
)

watchers.append(
Watcher(
name="api_server",
name=watcher_name,
cmd=sys.executable,
args=[
"-m",
SCRIPT_API_SERVER,
script_to_use,
bento_identifier,
"--bind",
f"fd://$(circus.sockets.{API_SERVER_NAME})",
socket_path,
"--runner-map",
json.dumps(runner_bind_map),
"--working-dir",
working_dir,
"--backlog",
f"{backlog}",
*num_connect_args,
"--worker-id",
"$(CIRCUS.WID)",
"--prometheus-dir",
Expand Down
116 changes: 116 additions & 0 deletions bentoml/_internal/server/cli/grpc_api_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from __future__ import annotations

import sys
import json
from urllib.parse import urlparse

import click


@click.command()
@click.argument("bento_identifier", type=click.STRING, required=False, default=".")
@click.option(
"--bind",
type=click.STRING,
required=True,
help="Bind address sent to circus. This address accepts the following values: 'tcp://127.0.0.1:3000','unix:///tmp/bento_api.sock', 'fd://12'",
)
@click.option(
"--runner-map",
type=click.STRING,
envvar="BENTOML_RUNNER_MAP",
help="JSON string of runners map, default sets to envars `BENTOML_RUNNER_MAP`",
)
@click.option(
"--working-dir",
type=click.Path(exists=True),
help="Working directory for the API server",
)
@click.option(
"--prometheus-dir",
type=click.Path(exists=True),
help="Required by prometheus to pass the metrics in multi-process mode",
)
@click.option(
"--worker-id",
required=False,
type=click.INT,
default=None,
help="If set, start the server as a bare worker with the given worker ID. Otherwise start a standalone server with a supervisor process.",
)
@click.pass_context
def main(
ctx: click.Context,
bento_identifier: str,
bind: str,
runner_map: str | None,
working_dir: str | None,
worker_id: int | None,
prometheus_dir: str | None,
):
"""
Start BentoML API server.
\b
This is an internal API, users should not use this directly. Instead use `bentoml serve <path> [--options]`
"""

import bentoml
from bentoml._internal.log import configure_server_logging
from bentoml._internal.context import component_context
from bentoml._internal.configuration.containers import BentoMLContainer

configure_server_logging()

BentoMLContainer.development_mode.set(False)
if prometheus_dir is not None:
BentoMLContainer.prometheus_multiproc_dir.set(prometheus_dir)

if worker_id is None:
# Start a standalone server with a supervisor process
from circus.watcher import Watcher

from bentoml._internal.server import ensure_prometheus_dir
from bentoml._internal.utils.click import unparse_click_params
from bentoml._internal.utils.circus import create_standalone_arbiter

ensure_prometheus_dir()
parsed = urlparse(bind)
params = ctx.params
params["max_concurrent_streams"] = f"tcp://0.0.0.0:{parsed.port}"
params["worker_id"] = "$(circus.wid)"
watcher = Watcher(
name="bento_api_server",
cmd=sys.executable,
args=["-m", "bentoml._internal.server.cli.grpc_api_server"]
+ unparse_click_params(params, ctx.command.params, factory=str),
copy_env=True,
numprocesses=1,
stop_children=True,
working_dir=working_dir,
)
arbiter = create_standalone_arbiter(watchers=[watcher])
arbiter.start()
return

component_context.component_name = f"api_server:{worker_id}"

if runner_map is not None:
BentoMLContainer.remote_runner_mapping.set(json.loads(runner_map))
svc = bentoml.load(bento_identifier, working_dir=working_dir, standalone_load=True)

# setup context
if svc.tag is None:
component_context.bento_name = f"*{svc.__class__.__name__}"
component_context.bento_version = "not available"
else:
component_context.bento_name = svc.tag.name
component_context.bento_version = svc.tag.version

parsed = urlparse(bind)
assert parsed.scheme == "tcp"

svc.grpc_server.run(bind_addr=f"[::]:{parsed.port}")


if __name__ == "__main__":
main()
14 changes: 8 additions & 6 deletions bentoml/_internal/server/cli/grpc_dev_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
import click
import psutil

from bentoml import load
from bentoml._internal.log import configure_server_logging
from bentoml._internal.context import component_context
from bentoml._internal.configuration.containers import BentoMLContainer


@click.command()
@click.argument("bento_identifier", type=click.STRING, required=False, default=".")
Expand All @@ -27,6 +22,10 @@ def main(
working_dir: str | None,
prometheus_dir: str | None,
):
from bentoml import load
from bentoml._internal.log import configure_server_logging
from bentoml._internal.context import component_context
from bentoml._internal.configuration.containers import BentoMLContainer

component_context.component_name = "grpc_dev_api_server"

Expand All @@ -50,7 +49,10 @@ def main(

parsed = urlparse(bind)

svc.grpc_server.run(bind_addr=f"[::]:{parsed.port}")
if parsed.scheme == "tcp":
svc.grpc_server.run(bind_addr=f"[::]:{parsed.port}")
else:
raise ValueError(f"Unsupported bind scheme: {bind}")


if __name__ == "__main__":
Expand Down

0 comments on commit 8f2a36e

Please sign in to comment.