-
Notifications
You must be signed in to change notification settings - Fork 618
/
service.py
135 lines (115 loc) 路 4.28 KB
/
service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""Reliably launch and connect to backend server process (wandb service).
Backend server process can be connected to using tcp sockets or grpc transport.
"""
import os
import platform
import subprocess
import tempfile
import time
from typing import Any, Dict, Optional
from . import port_file
from .service_base import ServiceInterface
from .service_sock import ServiceSockInterface
class _Service:
_grpc_port: Optional[int]
_sock_port: Optional[int]
_service_interface: ServiceInterface
_internal_proc: Optional[subprocess.Popen]
def __init__(
self,
_python_executable: str,
_use_grpc: bool = False,
) -> None:
self._use_grpc = _use_grpc
self._python_executable = _python_executable
self._stub = None
self._grpc_port = None
self._sock_port = None
self._internal_proc = None
# current code only supports grpc or socket server implementation, in the
# future we might be able to support both
if _use_grpc:
from .service_grpc import ServiceGrpcInterface
self._service_interface = ServiceGrpcInterface()
else:
self._service_interface = ServiceSockInterface()
def _wait_for_ports(self, fname: str, proc: subprocess.Popen = None) -> bool:
time_max = time.time() + 30
while time.time() < time_max:
if proc and proc.poll():
# process finished
print("proc exited with", proc.returncode)
return False
if not os.path.isfile(fname):
time.sleep(0.2)
continue
try:
pf = port_file.PortFile()
pf.read(fname)
if not pf.is_valid:
time.sleep(0.2)
continue
self._grpc_port = pf.grpc_port
self._sock_port = pf.sock_port
except Exception as e:
print("Error:", e)
return False
return True
return False
def _launch_server(self) -> None:
"""Launch server and set ports."""
# References for starting processes
# - https://github.com/wandb/wandb/blob/archive/old-cli/wandb/__init__.py
# - https://stackoverflow.com/questions/1196074/how-to-start-a-background-process-in-python
kwargs: Dict[str, Any] = dict(close_fds=True)
# flags to handle keyboard interrupt signal that is causing a hang
if platform.system() == "Windows":
kwargs.update(creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) # type: ignore [attr-defined]
else:
kwargs.update(start_new_session=True)
pid = os.getpid()
with tempfile.TemporaryDirectory() as tmpdir:
fname = os.path.join(tmpdir, f"port-{pid}.txt")
pid_str = str(os.getpid())
executable = self._python_executable
exec_cmd_list = [executable, "-m"]
# Add coverage collection if needed
if os.environ.get("YEA_RUN_COVERAGE") and os.environ.get("COVERAGE_RCFILE"):
exec_cmd_list += ["coverage", "run", "-m"]
service_args = [
"wandb",
"service",
"--port-filename",
fname,
"--pid",
pid_str,
"--debug",
]
if self._use_grpc:
service_args.append("--serve-grpc")
else:
service_args.append("--serve-sock")
internal_proc = subprocess.Popen(
exec_cmd_list + service_args,
env=os.environ,
**kwargs,
)
ports_found = self._wait_for_ports(fname, proc=internal_proc)
assert ports_found
self._internal_proc = internal_proc
def start(self) -> None:
self._launch_server()
@property
def grpc_port(self) -> Optional[int]:
return self._grpc_port
@property
def sock_port(self) -> Optional[int]:
return self._sock_port
@property
def service_interface(self) -> ServiceInterface:
return self._service_interface
def join(self) -> int:
ret = 0
if self._internal_proc:
ret = self._internal_proc.wait()
return ret