Skip to content

Commit

Permalink
fix(runner): receive requests before cork (#2996)
Browse files Browse the repository at this point in the history
## What does this PR address?
<!--
Thanks for sending a pull request!

Congrats for making it this far! Here's a 馃嵄 for you. There are still a
few steps ahead.

Please make sure to read the contribution guidelines, then fill out the
blanks below before requesting a code review.

Name your Pull Request with one of the following prefixes, e.g. "feat:
add support for PyTorch", to indicate the type of changes proposed. This
is based on the [Conventional Commits
specification](https://www.conventionalcommits.org/en/v1.0.0/#summary).
  - feat: (new feature for the user, not a new feature for build script)
  - fix: (bug fix for the user, not a fix to a build script)
  - docs: (changes to the documentation)
- style: (formatting, missing semicolons, etc; no production code
change)
  - refactor: (refactoring production code, eg. renaming a variable)
  - perf: (code changes that improve performance)
- test: (adding missing tests, refactoring tests; no production code
change)
  - chore: (updating grunt tasks etc; no production code change)
- build: (changes that affect the build system or external dependencies)
  - ci: (changes to configuration files and scripts)
  - revert: (reverts a previous commit)

Describe your changes in detail. Attach screenshots here if appropriate.

Once you're done with this, someone from BentoML team or community
member will help review your PR (see "Who can help review?" section for
potential reviewers.). If no one has reviewed your PR after a week have
passed, don't hesitate to post a new comment and ping @-the same person.
Notifications sometimes get lost 馃ゲ.
-->

<!-- Remove if not applicable -->
Fixes #(issue)

## Before submitting:
<!--- Go over all the following points, and put an `x` in all the boxes
that apply. -->
<!--- If you're unsure about any of these, don't hesitate to ask. We're
here to help! -->
<!--- If you plan to update documentation or tests in follow-up, please
note -->
- [ ] Does the Pull Request follow [Conventional Commits
specification](https://www.conventionalcommits.org/en/v1.0.0/#summary)
naming? Here are [GitHub's

guide](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request)
on how to create a pull request.
- [ ] Does the code follow BentoML's code style, both `make format` and
`make lint` script have passed
([instructions](https://github.com/bentoml/BentoML/blob/main/DEVELOPMENT.md#style-check-auto-formatting-type-checking))?
- [ ] Did you read through [contribution
guidelines](https://github.com/bentoml/BentoML/blob/main/CONTRIBUTING.md#ways-to-contribute)
and follow [development
guidelines](https://github.com/bentoml/BentoML/blob/main/DEVELOPMENT.md#start-developing)?
- [ ] Did your changes require updates to the documentation? Have you
updated
those accordingly? Here are [documentation
guidelines](https://github.com/bentoml/BentoML/tree/main/docs) and [tips
on writting
docs](https://github.com/bentoml/BentoML/tree/main/docs#writing-documentation).
- [ ] Did you write tests to cover your changes?

## Who can help review?

Feel free to tag members/contributors who can help review your PR.
<!--
Feel free to ping any of the BentoML members for help on your issue, but
don't ping more than three people 馃槉.
If you know how to use git blame, that is probably the easiest way.

Team members that you can ping:
- @parano
- @yubozhao
- @bojiang
- @ssheng
- @aarnphm
- @sauyon
- @larme
- @yetone
- @jjmachan
-->
  • Loading branch information
bojiang committed Sep 16, 2022
1 parent fc39429 commit bf1e8b5
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 44 deletions.
2 changes: 1 addition & 1 deletion bentoml/_internal/marshal/dispatcher.py
Expand Up @@ -149,7 +149,7 @@ def _wake_event(self):
def __call__(
self,
callback: t.Callable[
[t.Collection[T_IN]], t.Coroutine[None, None, t.Collection[T_OUT]]
[t.Sequence[T_IN]], t.Coroutine[None, None, t.Sequence[T_OUT]]
],
) -> t.Callable[[T_IN], t.Coroutine[None, None, T_OUT]]:
self.callback = callback
Expand Down
91 changes: 48 additions & 43 deletions bentoml/_internal/server/runner_app.py
Expand Up @@ -11,6 +11,8 @@
from simple_di import inject
from simple_di import Provide

from bentoml._internal.types import LazyType

from ..context import trace_context
from ..context import component_context
from ..runner.utils import Params
Expand All @@ -19,6 +21,7 @@
from ..utils.metrics import metric_name
from ..utils.metrics import exponential_buckets
from ..server.base_app import BaseAppFactory
from ..runner.container import Payload
from ..runner.container import AutoContainer
from ..marshal.dispatcher import CorkDispatcher
from ..configuration.containers import BentoMLContainer
Expand Down Expand Up @@ -138,13 +141,10 @@ def routes(self) -> t.List[BaseRoute]:
for method in self.runner.runner_methods:
path = "/" if method.name == "__call__" else "/" + method.name
if method.config.batchable:
_func = self.dispatchers[method.name](
self._async_cork_run(runner_method=method)
)
routes.append(
Route(
path=path,
endpoint=_func,
endpoint=self._mk_request_handler(runner_method=method),
methods=["POST"],
)
)
Expand Down Expand Up @@ -205,32 +205,30 @@ def client_request_hook(span: Span, _scope: t.Dict[str, t.Any]) -> None:

return middlewares

def _async_cork_run(
def _mk_request_handler(
self,
runner_method: RunnerMethod[t.Any, t.Any, t.Any],
) -> t.Callable[[t.Collection[Request]], t.Coroutine[None, None, list[Response]]]:
) -> t.Callable[[Request], t.Coroutine[None, None, Response]]:
from starlette.responses import Response

async def _run(requests: t.Collection[Request]) -> list[Response]:
assert self._is_ready
server_str = f"BentoML-Runner/{self.runner.name}/{runner_method.name}/{self.worker_index}"

self.legacy_adaptive_batch_size_hist_map[runner_method.name].observe(
len(requests)
async def infer_batch(
params_list: t.Sequence[Params[t.Any]],
) -> list[Payload] | list[tuple[Payload, ...]]:
self.legacy_adaptive_batch_size_hist_map[runner_method.name].observe( # type: ignore
len(params_list)
)
self.adaptive_batch_size_hist.labels(
self.adaptive_batch_size_hist.labels( # type: ignore
runner_name=self.runner.name,
worker_index=self.worker_index,
method_name=runner_method.name,
service_version=component_context.bento_version,
service_name=component_context.bento_name,
).observe(len(requests))
).observe(len(params_list))

if not requests:
if not params_list:
return []
params_list: list[Params[t.Any]] = []
for r in requests:
r_ = await r.body()
params_list.append(pickle.loads(r_))

input_batch_dim, output_batch_dim = runner_method.config.batch_dim

Expand All @@ -242,50 +240,57 @@ async def _run(requests: t.Collection[Request]) -> list[Response]:
*batched_params.args, **batched_params.kwargs
)

server_str = f"BentoML-Runner/{self.runner.name}/{runner_method.name}/{self.worker_index}"

# multiple output branch
if isinstance(batch_ret, tuple):
if LazyType["tuple[t.Any, ...]"](tuple).isinstance(batch_ret):
output_num = len(batch_ret)
payloadss = [
payloadss = tuple(
AutoContainer.batch_to_payloads(
batch_ret[idx], indices, batch_dim=output_batch_dim
)
for idx in range(output_num)
]

return [
Response(
pickle.dumps(payloads),
headers={
PAYLOAD_META_HEADER: json.dumps({}),
"Content-Type": "application/vnd.bentoml.multiple_outputs",
"Server": server_str,
},
)
for payloads in zip(*payloadss)
]
)
ret = list(zip(*payloadss))
return ret

# single output branch
payloads = AutoContainer.batch_to_payloads(
batch_ret,
indices,
batch_dim=output_batch_dim,
)
return payloads

return [
Response(
payload.data,
infer = self.dispatchers[runner_method.name](infer_batch)

async def _request_handler(request: Request) -> Response:
assert self._is_ready

r_: bytes = await request.body()
params: Params[t.Any] = pickle.loads(r_)

payload = await infer(params)

if not isinstance(
payload, Payload
): # a tuple, which means user runnable has multiple outputs
return Response(
pickle.dumps(payload),
headers={
PAYLOAD_META_HEADER: json.dumps(payload.meta),
"Content-Type": f"application/vnd.bentoml.{payload.container}",
PAYLOAD_META_HEADER: json.dumps({}),
"Content-Type": "application/vnd.bentoml.multiple_outputs",
"Server": server_str,
},
)
for payload in payloads
]
return Response(
payload.data,
headers={
PAYLOAD_META_HEADER: json.dumps(payload.meta),
"Content-Type": f"application/vnd.bentoml.{payload.container}",
"Server": server_str,
},
)

return _run
return _request_handler

def async_run(
self,
Expand All @@ -302,7 +307,7 @@ async def _run(request: Request) -> Response:

try:
ret = await runner_method.async_run(*params.args, **params.kwargs)
except BaseException as exc:
except Exception as exc: # pylint: disable=broad-except
logger.error(
f"Exception on runner '{runner_method.runner.name}' method '{runner_method.name}'",
exc_info=exc,
Expand Down

0 comments on commit bf1e8b5

Please sign in to comment.