Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[App] Add CloudMultiProcessBackend to run an children App within the Flow in the cloud #15800

Merged
merged 37 commits into from Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
bfe9965
update
tchaton Nov 22, 2022
926b2df
update
tchaton Nov 23, 2022
91fc683
update
tchaton Nov 23, 2022
ef8a573
update
tchaton Nov 23, 2022
56c0582
update
tchaton Nov 23, 2022
0a2add5
update
tchaton Nov 23, 2022
8fc0e41
update
tchaton Nov 23, 2022
bb5ea69
Merge branch 'master' into enable_vscode_cloud_2
tchaton Nov 23, 2022
3a9ca9d
update
tchaton Nov 23, 2022
698389a
Merge branch 'enable_vscode_cloud_2' of https://github.com/Lightning-…
tchaton Nov 23, 2022
0cd3acf
update
tchaton Nov 23, 2022
496d139
update
tchaton Nov 23, 2022
7c08b9a
update
tchaton Nov 23, 2022
ddd1b19
update
tchaton Nov 23, 2022
6ea46c3
update
tchaton Nov 23, 2022
7c476d3
update
tchaton Nov 23, 2022
c869442
updte
tchaton Nov 23, 2022
d8fa7d5
update
tchaton Nov 23, 2022
0be0119
update
tchaton Nov 23, 2022
e50204b
update
tchaton Nov 23, 2022
982c035
update
tchaton Nov 23, 2022
9ff083d
update
tchaton Nov 24, 2022
ddfab6f
update
tchaton Nov 24, 2022
51004a8
update
tchaton Nov 24, 2022
a4ca231
update
tchaton Nov 24, 2022
f82ad0f
update
tchaton Nov 24, 2022
9e282b2
update
tchaton Nov 24, 2022
c39510d
update
tchaton Nov 24, 2022
41317b6
Update src/lightning_app/CHANGELOG.md
tchaton Nov 24, 2022
16e5738
Update src/lightning_app/utilities/port.py
tchaton Nov 24, 2022
b26fc0c
Update src/lightning_app/utilities/port.py
tchaton Nov 24, 2022
f2f1b15
Update src/lightning_app/utilities/port.py
tchaton Nov 24, 2022
175c46f
Update src/lightning_app/utilities/port.py
tchaton Nov 24, 2022
ec87064
Update src/lightning_app/utilities/port.py
tchaton Nov 24, 2022
04daf57
Update src/lightning_app/utilities/port.py
tchaton Nov 24, 2022
b6b6bb4
Merge branch 'master' into enable_vscode_cloud_2
tchaton Nov 24, 2022
0b25da0
Merge branch 'master' into enable_vscode_cloud_2
tchaton Nov 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Expand Up @@ -139,6 +139,7 @@ module = [
"lightning_app.utilities.packaging.cloud_compute",
"lightning_app.utilities.packaging.docker",
"lightning_app.utilities.packaging.lightning_utils",
"lightning_app.utilities.port",
"lightning_app.utilities.proxies",
"lightning_app.utilities.scheduler",
"lightning_app.utilities.state",
Expand Down
3 changes: 3 additions & 0 deletions src/lightning_app/CHANGELOG.md
Expand Up @@ -11,6 +11,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Added the CLI command `lightning run model` to launch a `LightningLite` accelerated script ([#15506](https://github.com/Lightning-AI/lightning/pull/15506))


- Added the CloudMultiProcessBackend to run an children App within the Flow in the cloud ([#15800](https://github.com/Lightning-AI/lightning/pull/15800))
tchaton marked this conversation as resolved.
Show resolved Hide resolved


### Changed

- `lightning add ssh-key` CLI command has been transitioned to `lightning create ssh-key` with the same calling signature ([#15761](https://github.com/Lightning-AI/lightning/pull/15761))
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/cli/lightning_cli.py
Expand Up @@ -54,7 +54,7 @@ def get_app_url(runtime_type: RuntimeType, *args: Any, need_credits: bool = Fals
action = "?action=add_credits" if need_credits else ""
return f"{get_lightning_cloud_url()}/me/apps/{lit_app.id}{action}"
else:
return "http://127.0.0.1:7501/view"
return os.getenv("APP_SERVER_HOST", "http://127.0.0.1:7501/view")


def main() -> None:
Expand Down
6 changes: 5 additions & 1 deletion src/lightning_app/core/app.py
Expand Up @@ -138,6 +138,7 @@ def __init__(
self._schedules: Dict[str, Dict] = {}
self.threads: List[threading.Thread] = []
self.exception = None
self.collect_changes: bool = True

# NOTE: Checkpointing is disabled by default for the time being. We
# will enable it when resuming from full checkpoint is supported. Also,
Expand Down Expand Up @@ -362,11 +363,14 @@ def _collect_deltas_from_ui_and_work_queues(self) -> List[Union[Delta, _APIReque
delta.raise_errors = False
return deltas

def maybe_apply_changes(self) -> bool:
def maybe_apply_changes(self) -> None:
"""Get the deltas from both the flow queue and the work queue, merge the two deltas and update the
state."""
self._send_flow_to_work_deltas(self.state)

if not self.collect_changes:
return None

deltas = self._collect_deltas_from_ui_and_work_queues()

if not deltas:
Expand Down
12 changes: 8 additions & 4 deletions src/lightning_app/core/constants.py
Expand Up @@ -3,6 +3,8 @@

import lightning_cloud.env

from lightning_app.utilities.port import _find_lit_app_port


def get_lightning_cloud_url() -> str:
# DO NOT CHANGE!
Expand All @@ -19,7 +21,8 @@ def get_lightning_cloud_url() -> str:
FLOW_DURATION_SAMPLES = 5

APP_SERVER_HOST = os.getenv("LIGHTNING_APP_STATE_URL", "http://127.0.0.1")
APP_SERVER_PORT = 7501
APP_SERVER_IN_CLOUD = "http://lightningapp" in APP_SERVER_HOST
APP_SERVER_PORT = _find_lit_app_port(7501)
APP_STATE_MAX_SIZE_BYTES = 1024 * 1024 # 1 MB

CLOUD_QUEUE_TYPE = os.getenv("LIGHTNING_CLOUD_QUEUE_TYPE", None)
Expand Down Expand Up @@ -52,9 +55,6 @@ def get_lightning_cloud_url() -> str:

# EXPERIMENTAL: ENV VARIABLES TO ENABLE MULTIPLE WORKS IN THE SAME MACHINE
DEFAULT_NUMBER_OF_EXPOSED_PORTS = int(os.getenv("DEFAULT_NUMBER_OF_EXPOSED_PORTS", "50"))
ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER = bool(
int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0"))
) # Note: This is disabled for the time being.
ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER = bool(
int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER", "0"))
) # This isn't used in the cloud yet.
Expand All @@ -71,3 +71,7 @@ def get_lightning_cloud_url() -> str:
)
ENABLE_STATE_WEBSOCKET = bool(int(os.getenv("ENABLE_STATE_WEBSOCKET", "0")))
ENABLE_UPLOAD_ENDPOINT = bool(int(os.getenv("ENABLE_UPLOAD_ENDPOINT", "1")))


def enable_multiple_works_in_default_container() -> bool:
return bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0")))
5 changes: 4 additions & 1 deletion src/lightning_app/runners/backends/__init__.py
@@ -1,9 +1,10 @@
from enum import Enum

from lightning_app.core.constants import APP_SERVER_IN_CLOUD
from lightning_app.runners.backends.backend import Backend
from lightning_app.runners.backends.cloud import CloudBackend
from lightning_app.runners.backends.docker import DockerBackend
from lightning_app.runners.backends.mp_process import MultiProcessingBackend
from lightning_app.runners.backends.mp_process import CloudMultiProcessingBackend, MultiProcessingBackend


class BackendType(Enum):
Expand All @@ -13,6 +14,8 @@ class BackendType(Enum):

def get_backend(self, entrypoint_file: str) -> "Backend":
if self == BackendType.MULTIPROCESSING:
if APP_SERVER_IN_CLOUD:
return CloudMultiProcessingBackend(entrypoint_file)
return MultiProcessingBackend(entrypoint_file)
elif self == BackendType.DOCKER:
return DockerBackend(entrypoint_file)
Expand Down
22 changes: 22 additions & 0 deletions src/lightning_app/runners/backends/mp_process.py
Expand Up @@ -6,6 +6,7 @@
from lightning_app.runners.backends.backend import Backend, WorkManager
from lightning_app.utilities.enum import WorkStageStatus
from lightning_app.utilities.network import _check_service_url_is_ready
from lightning_app.utilities.port import disable_port, enable_port
from lightning_app.utilities.proxies import ProxyWorkRun, WorkRunner


Expand Down Expand Up @@ -83,3 +84,24 @@ def resolve_url(self, app, base_url: Optional[str] = None) -> None:
def stop_work(self, app, work: "lightning_app.LightningWork") -> None:
work_manager: MultiProcessWorkManager = app.processes[work.name]
work_manager.kill()


class CloudMultiProcessingBackend(MultiProcessingBackend):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# Note: Track the open ports to close them on termination.
self.ports = []

def create_work(self, app, work) -> None:
work._host = "0.0.0.0"
nc = enable_port()
self.ports.append(nc.port)
work._port = nc.port
work._future_url = f"https://{nc.host}"
return super().create_work(app, work)

def stop_work(self, app, work: "lightning_app.LightningWork") -> None:
disable_port(work._port)
self.ports = [port for port in self.ports if port != work._port]
return super().stop_work(app, work)
6 changes: 3 additions & 3 deletions src/lightning_app/runners/cloud.py
Expand Up @@ -51,7 +51,7 @@
DEFAULT_NUMBER_OF_EXPOSED_PORTS,
DISABLE_DEPENDENCY_CACHE,
ENABLE_APP_COMMENT_COMMAND_EXECUTION,
ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER,
enable_multiple_works_in_default_container,
ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER,
ENABLE_PULLING_STATE_ENDPOINT,
ENABLE_PUSHING_STATE_ENDPOINT,
Expand Down Expand Up @@ -243,7 +243,7 @@ def dispatch(
if self.run_app_comment_commands or ENABLE_APP_COMMENT_COMMAND_EXECUTION:
v1_env_vars.append(V1EnvVar(name="ENABLE_APP_COMMENT_COMMAND_EXECUTION", value="1"))

if ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER:
if enable_multiple_works_in_default_container():
v1_env_vars.append(V1EnvVar(name="ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", value="1"))

if ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER:
Expand Down Expand Up @@ -303,7 +303,7 @@ def dispatch(
)

network_configs: Optional[List[V1NetworkConfig]] = None
if ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER:
if enable_multiple_works_in_default_container():
network_configs = []
initial_port = 8080 + 1 + len(frontend_specs)
for _ in range(DEFAULT_NUMBER_OF_EXPOSED_PORTS):
Expand Down
13 changes: 13 additions & 0 deletions src/lightning_app/runners/multiprocess.py
Expand Up @@ -5,6 +5,7 @@

from lightning_app.api.http_methods import _add_tags_to_api, _validate_api
from lightning_app.core.api import start_server
from lightning_app.core.constants import APP_SERVER_IN_CLOUD
from lightning_app.runners.backends import Backend
from lightning_app.runners.runtime import Runtime
from lightning_app.storage.orchestrator import StorageOrchestrator
Expand All @@ -13,6 +14,7 @@
from lightning_app.utilities.component import _set_flow_context, _set_frontend_context
from lightning_app.utilities.load_app import extract_metadata_from_app
from lightning_app.utilities.network import find_free_network_port
from lightning_app.utilities.port import disable_port


@dataclass
Expand All @@ -31,6 +33,9 @@ def dispatch(self, *args: Any, on_before_run: Optional[Callable] = None, **kwarg
try:
_set_flow_context()

# Note: In case the runtime is used in the cloud.
self.host = "0.0.0.0" if APP_SERVER_IN_CLOUD else self.host

self.app.backend = self.backend
self.backend._prepare_queues(self.app)
self.backend.resolve_url(self.app, "http://127.0.0.1")
Expand Down Expand Up @@ -109,3 +114,11 @@ def dispatch(self, *args: Any, on_before_run: Optional[Callable] = None, **kwarg
raise
finally:
self.terminate()

def terminate(self):
if APP_SERVER_IN_CLOUD:
# Close all the ports open for the App within the App.
ports = [self.port] + getattr(self.backend, "ports", [])
for port in ports:
disable_port(port)
super().terminate()
140 changes: 140 additions & 0 deletions src/lightning_app/utilities/port.py
@@ -0,0 +1,140 @@
import os
from typing import Optional

from lightning_cloud.openapi import AppinstancesIdBody, Externalv1LightningappInstance, V1NetworkConfig

from lightning_app.utilities.network import LightningClient


def _find_lit_app_port(default_port: int) -> int:
"""Make a request to the cloud controlplane to find a disabled port of the flow, enable it and return it."""

app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", None)
project_id = os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None)
enable_multiple_works_in_default_container = bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0")))

if not app_id or not project_id or not enable_multiple_works_in_default_container:
return default_port

assert project_id
tchaton marked this conversation as resolved.
Show resolved Hide resolved

client = LightningClient()
list_apps_resp = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id)
lit_app: Optional[Externalv1LightningappInstance] = None

for lightningapp in list_apps_resp.lightningapps:
if lightningapp.id == app_id:
lit_app = lightningapp

assert lit_app
tchaton marked this conversation as resolved.
Show resolved Hide resolved

found_nc = None

for nc in lit_app.spec.network_config:
if not nc.enable:
found_nc = nc
nc.enable = True
break

client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project_id,
id=lit_app.id,
body=AppinstancesIdBody(name=lit_app.name, spec=lit_app.spec),
)

if not found_nc:
raise RuntimeError(
"No available port was found. Please open an issue at https://github.com/lightning-AI/lightning/issues."
)

# Note: This is required for the framework to know we need to use the CloudMultiProcessRuntime.
os.environ["APP_SERVER_HOST"] = f"https://{found_nc.host}"

return found_nc.port


def enable_port() -> V1NetworkConfig:
"""Make a request to the cloud controlplane to open a port of the flow."""
app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", None)
project_id = os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None)

if not app_id or not project_id:
raise Exception("The app_id and project_id should be defined.")

assert project_id
tchaton marked this conversation as resolved.
Show resolved Hide resolved

client = LightningClient()
list_apps_resp = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id)
lit_app: Optional[Externalv1LightningappInstance] = None

for lightningapp in list_apps_resp.lightningapps:
if lightningapp.id == app_id:
lit_app = lightningapp

assert lit_app
tchaton marked this conversation as resolved.
Show resolved Hide resolved

found_nc = None

for nc in lit_app.spec.network_config:
if not nc.enable:
found_nc = nc
nc.enable = True
break

client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project_id,
id=lit_app.id,
body=AppinstancesIdBody(name=lit_app.name, spec=lit_app.spec),
)

if not found_nc:
raise RuntimeError(
"No available port was found. Please open an issue at https://github.com/lightning-AI/lightning/issues."
)

return found_nc


def disable_port(port: int, ignore_disabled: bool = True) -> None:
"""Make a request to the cloud controlplane to close a port of the flow."""

app_id = os.getenv("LIGHTNING_CLOUD_APP_ID", None)
project_id = os.getenv("LIGHTNING_CLOUD_PROJECT_ID", None)

if not app_id or not project_id:
raise Exception("The app_id and project_id should be defined.")

assert project_id
tchaton marked this conversation as resolved.
Show resolved Hide resolved

client = LightningClient()
list_apps_resp = client.lightningapp_instance_service_list_lightningapp_instances(project_id=project_id)
lit_app: Optional[Externalv1LightningappInstance] = None

for lightningapp in list_apps_resp.lightningapps:
if lightningapp.id == app_id:
lit_app = lightningapp

assert lit_app
tchaton marked this conversation as resolved.
Show resolved Hide resolved

found_nc = None

for nc in lit_app.spec.network_config:
if nc.port == port:
if not nc.enable and not ignore_disabled:
raise RuntimeError(f"The port {port} was already disabled.")

nc.enable = False
found_nc = nc
break

client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project_id,
id=lit_app.id,
body=AppinstancesIdBody(name=lit_app.name, spec=lit_app.spec),
)

if not found_nc:
ports = [nc.port for nc in lit_app.spec.network_config]
raise ValueError(f"The provided port doesn't exists. Available ports are {ports}.")

assert found_nc