-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
singleprocess.py
62 lines (50 loc) · 2.22 KB
/
singleprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import multiprocessing as mp
import os
from typing import Any
import click
from lightning_app.core.api import start_server
from lightning_app.core.queues import QueuingSystem
from lightning_app.runners.runtime import Runtime
from lightning_app.utilities.app_helpers import _is_headless
from lightning_app.utilities.load_app import extract_metadata_from_app
class SingleProcessRuntime(Runtime):
"""Runtime to launch the LightningApp into a single process."""
def __post_init__(self):
pass
def dispatch(self, *args, open_ui: bool = True, **kwargs: Any):
"""Method to dispatch and run the LightningApp."""
queue = QueuingSystem.SINGLEPROCESS
self.app.delta_queue = queue.get_delta_queue()
self.app.state_update_queue = queue.get_caller_queue(work_name="single_worker")
self.app.error_queue = queue.get_error_queue()
if self.start_server:
self.app.should_publish_changes_to_api = True
self.app.api_publish_state_queue = QueuingSystem.MULTIPROCESS.get_api_state_publish_queue()
self.app.api_delta_queue = QueuingSystem.MULTIPROCESS.get_api_delta_queue()
has_started_queue = QueuingSystem.MULTIPROCESS.get_has_server_started_queue()
kwargs = dict(
host=self.host,
port=self.port,
api_publish_state_queue=self.app.api_publish_state_queue,
api_delta_queue=self.app.api_delta_queue,
has_started_queue=has_started_queue,
spec=extract_metadata_from_app(self.app),
root_path=self.app.root_path,
)
server_proc = mp.Process(target=start_server, kwargs=kwargs)
self.processes["server"] = server_proc
server_proc.start()
# wait for server to be ready.
has_started_queue.get()
if open_ui and not _is_headless(self.app):
click.launch(self._get_app_url())
try:
self.app._run()
except KeyboardInterrupt:
self.terminate()
raise
finally:
self.terminate()
@staticmethod
def _get_app_url() -> str:
return os.getenv("APP_SERVER_HOST", "http://127.0.0.1:7501/view")