/
__init__.py
116 lines (90 loc) · 3.63 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from __future__ import annotations
import typing as t
import logging
import importlib
from typing import TYPE_CHECKING
from inspect import isawaitable
from ....utils import LazyLoader
from .....grpc.utils import import_generated_stubs
from .....grpc.utils import LATEST_PROTOCOL_VERSION
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from types import ModuleType
from grpc import aio
from grpc_health.v1 import health
from typing_extensions import Self
from bentoml.grpc.v1 import service_pb2_grpc as services
from bentoml.grpc.types import Interceptors
from bentoml.grpc.types import AddServicerFn
from bentoml.grpc.types import ServicerClass
from ....service.service import Service
class ServicerModule(ModuleType):
@staticmethod
def create_bento_servicer(service: Service) -> services.BentoServiceServicer:
...
else:
health = LazyLoader(
"health",
globals(),
"grpc_health.v1.health",
exc_msg="'grpcio-health-checking' is required for using health checking endpoints. Install with 'pip install grpcio-health-checking'.",
)
struct_pb2 = LazyLoader("struct_pb2", globals(), "google.protobuf.struct_pb2")
class Servicer:
"""Create an instance of gRPC Servicer."""
_cached_module = None
def __init__(
self: Self,
service: Service,
on_startup: t.Sequence[t.Callable[[], t.Any]] | None = None,
on_shutdown: t.Sequence[t.Callable[[], t.Any]] | None = None,
mount_servicers: t.Sequence[tuple[ServicerClass, AddServicerFn, list[str]]]
| None = None,
interceptors: Interceptors | None = None,
protocol_version: str = LATEST_PROTOCOL_VERSION,
) -> None:
self.bento_service = service
self.on_startup = [] if not on_startup else list(on_startup)
self.on_shutdown = [] if not on_shutdown else list(on_shutdown)
self.mount_servicers = [] if not mount_servicers else list(mount_servicers)
self.interceptors = [] if not interceptors else list(interceptors)
self.protocol_version = protocol_version
self.loaded = False
@property
def _servicer_module(self) -> ServicerModule:
if self._cached_module is None:
object.__setattr__(
self,
"_cached_module",
importlib.import_module(f".{self.protocol_version}", package=__name__),
)
assert self._cached_module is not None
return self._cached_module
def load(self):
assert not self.loaded
pb, _ = import_generated_stubs(self.protocol_version)
self.interceptors_stack = self.build_interceptors_stack()
self.bento_servicer = self._servicer_module.create_bento_servicer(
self.bento_service
)
# Create a health check servicer. We use the non-blocking implementation
# to avoid thread starvation.
self.health_servicer = health.aio.HealthServicer()
self.service_names = tuple(
service.full_name for service in pb.DESCRIPTOR.services_by_name.values()
) + (health.SERVICE_NAME,)
self.loaded = True
def build_interceptors_stack(self) -> list[aio.ServerInterceptor]:
return list(map(lambda x: x(), self.interceptors))
async def startup(self):
for handler in self.on_startup:
out = handler()
if isawaitable(out):
await out
async def shutdown(self):
for handler in self.on_shutdown:
out = handler()
if isawaitable(out):
await out
def __bool__(self):
return self.loaded