Skip to content

Commit

Permalink
[App] Support running on multiple clusters (#16016)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanwharris committed Dec 14, 2022
1 parent 9a24635 commit d3a7226
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 141 deletions.
213 changes: 120 additions & 93 deletions src/lightning_app/runners/cloud.py
@@ -1,13 +1,13 @@
import fnmatch
import json
import random
import re
import string
import sys
import time
from dataclasses import dataclass
from functools import partial
from pathlib import Path
from textwrap import dedent
from typing import Any, List, Optional, Union

import click
Expand All @@ -20,6 +20,7 @@
Externalv1LightningappInstance,
Gridv1ImageSpec,
V1BuildSpec,
V1ClusterType,
V1DependencyFileInfo,
V1Drive,
V1DriveSpec,
Expand Down Expand Up @@ -212,8 +213,6 @@ def dispatch(
# Determine the root of the project: Start at the entrypoint_file and look for nearby Lightning config files,
# going up the directory structure. The root of the project is where the Lightning config file is located.

# TODO: verify lightning version
# _verify_lightning_version()
config_file = _get_config_file(self.entrypoint_file)
app_config = AppConfig.load_from_file(config_file) if config_file.exists() else AppConfig()
root = Path(self.entrypoint_file).absolute().parent
Expand Down Expand Up @@ -242,10 +241,6 @@ def dispatch(
# Override the name if provided by the CLI
app_config.name = name

if cluster_id:
# Override the cluster ID if provided by the CLI
app_config.cluster_id = cluster_id

print(f"The name of the app is: {app_config.name}")

v1_env_vars = [V1EnvVar(name=k, value=v) for k, v in self.env_vars.items()]
Expand Down Expand Up @@ -307,17 +302,92 @@ def dispatch(
project = _get_project(self.backend.client)

try:
list_apps_resp = self.backend.client.lightningapp_v2_service_list_lightningapps_v2(
project_id=project.project_id, name=app_config.name
if cluster_id is not None:
# Verify that the cluster exists
list_clusters_resp = self.backend.client.cluster_service_list_clusters()
cluster_ids = [cluster.id for cluster in list_clusters_resp.clusters]
if cluster_id not in cluster_ids:
raise ValueError(f"You requested to run on cluster {cluster_id}, but that cluster doesn't exist.")

self._ensure_cluster_project_binding(project.project_id, cluster_id)

# Resolve the app name, instance, and cluster ID
existing_instance = None
app_name = app_config.name

# List existing instances
# TODO: Add pagination, otherwise this could break if users have a lot of apps.
find_instances_resp = self.backend.client.lightningapp_instance_service_list_lightningapp_instances(
project_id=project.project_id
)
if list_apps_resp.lightningapps:
# There can be only one app with unique project_id<>name pair
lit_app = list_apps_resp.lightningapps[0]
else:
app_body = Body7(name=app_config.name, can_download_source_code=True)

# Seach for instances with the given name (possibly with some random characters appended)
pattern = re.escape(f"{app_name}-") + ".{4}"
instances = [
lightningapp
for lightningapp in find_instances_resp.lightningapps
if lightningapp.name == app_name or (re.fullmatch(pattern, lightningapp.name) is not None)
]

# If instances exist and cluster is None, mimic cluster selection logic to choose a default
if cluster_id is None and len(instances) > 0:
# Determine the cluster ID
cluster_id = self._get_default_cluster(project.project_id)

# If an instance exists on the cluster with the same base name - restart it
for instance in instances:
if instance.spec.cluster_id == cluster_id:
existing_instance = instance
break

# If instances exist but not on the cluster - choose a randomised name
if len(instances) > 0 and existing_instance is None:
name_exists = True
while name_exists:
random_name = self._randomise_name(app_name)
name_exists = any([instance.name == random_name for instance in instances])

app_name = random_name

# Create the app if it doesn't exist
if existing_instance is None:
app_body = Body7(name=app_name, can_download_source_code=True)
lit_app = self.backend.client.lightningapp_v2_service_create_lightningapp_v2(
project_id=project.project_id, body=app_body
)
app_id = lit_app.id
else:
app_id = existing_instance.spec.app_id

# check if user has sufficient credits to run an app
# if so set the desired state to running otherwise, create the app in stopped state,
# and open the admin ui to add credits and running the app.
has_sufficient_credits = self._project_has_sufficient_credits(project, app=self.app)
app_release_desired_state = (
V1LightningappInstanceState.RUNNING if has_sufficient_credits else V1LightningappInstanceState.STOPPED
)
if not has_sufficient_credits:
logger.warn("You may need Lightning credits to run your apps on the cloud.")

# Stop the instance if it isn't stopped yet
if existing_instance and existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
# TODO(yurij): Implement release switching in the UI and remove this
# We can only switch release of the stopped instance
existing_instance = self.backend.client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project.project_id,
id=existing_instance.id,
body=Body3(spec=V1LightningappInstanceSpec(desired_state=V1LightningappInstanceState.STOPPED)),
)
# wait for the instance to stop for up to 150 seconds
for _ in range(150):
existing_instance = self.backend.client.lightningapp_instance_service_get_lightningapp_instance(
project_id=project.project_id, id=existing_instance.id
)
if existing_instance.status.phase == V1LightningappInstanceState.STOPPED:
break
time.sleep(1)
if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
raise RuntimeError("Failed to stop the existing instance.")

network_configs: Optional[List[V1NetworkConfig]] = None
if enable_multiple_works_in_default_container():
Expand All @@ -332,90 +402,18 @@ def dispatch(
)
initial_port += 1

# check if user has sufficient credits to run an app
# if so set the desired state to running otherwise, create the app in stopped state,
# and open the admin ui to add credits and running the app.
has_sufficient_credits = self._project_has_sufficient_credits(project, app=self.app)
app_release_desired_state = (
V1LightningappInstanceState.RUNNING if has_sufficient_credits else V1LightningappInstanceState.STOPPED
)
if not has_sufficient_credits:
logger.warn("You may need Lightning credits to run your apps on the cloud.")

# right now we only allow a single instance of the app
find_instances_resp = self.backend.client.lightningapp_instance_service_list_lightningapp_instances(
project_id=project.project_id, app_id=lit_app.id
)

queue_server_type = V1QueueServerType.UNSPECIFIED
if CLOUD_QUEUE_TYPE == "http":
queue_server_type = V1QueueServerType.HTTP
elif CLOUD_QUEUE_TYPE == "redis":
queue_server_type = V1QueueServerType.REDIS

existing_instance: Optional[Externalv1LightningappInstance] = None
if find_instances_resp.lightningapps:
existing_instance = find_instances_resp.lightningapps[0]

if not app_config.cluster_id:
# Re-run the app on the same cluster
app_config.cluster_id = existing_instance.spec.cluster_id

if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
# TODO(yurij): Implement release switching in the UI and remove this
# We can only switch release of the stopped instance
existing_instance = self.backend.client.lightningapp_instance_service_update_lightningapp_instance(
project_id=project.project_id,
id=existing_instance.id,
body=Body3(spec=V1LightningappInstanceSpec(desired_state=V1LightningappInstanceState.STOPPED)),
)
# wait for the instance to stop for up to 150 seconds
for _ in range(150):
existing_instance = self.backend.client.lightningapp_instance_service_get_lightningapp_instance(
project_id=project.project_id, id=existing_instance.id
)
if existing_instance.status.phase == V1LightningappInstanceState.STOPPED:
break
time.sleep(1)
if existing_instance.status.phase != V1LightningappInstanceState.STOPPED:
raise RuntimeError("Failed to stop the existing instance.")

if app_config.cluster_id is not None:
# Verify that the cluster exists
list_clusters_resp = self.backend.client.cluster_service_list_clusters()
cluster_ids = [cluster.id for cluster in list_clusters_resp.clusters]
if app_config.cluster_id not in cluster_ids:
if cluster_id:
msg = f"You requested to run on cluster {cluster_id}, but that cluster doesn't exist."
else:
msg = (
f"Your app last ran on cluster {app_config.cluster_id}, but that cluster "
"doesn't exist anymore."
)
raise ValueError(msg)
if existing_instance and existing_instance.spec.cluster_id != app_config.cluster_id:
raise ValueError(
dedent(
f"""\
An app names {app_config.name} is already running on cluster {existing_instance.spec.cluster_id}, and you requested it to run on cluster {app_config.cluster_id}.
In order to proceed, please either:
a. rename the app to run on {app_config.cluster_id} with the --name option
lightning run app {app_entrypoint_file} --name (new name) --cloud --cluster-id {app_config.cluster_id}
b. delete the app running on {existing_instance.spec.cluster_id} in the UI before running this command.
""" # noqa: E501
)
)

if app_config.cluster_id is not None:
self._ensure_cluster_project_binding(project.project_id, app_config.cluster_id)

release_body = Body8(
app_entrypoint_file=app_spec.app_entrypoint_file,
enable_app_server=app_spec.enable_app_server,
flow_servers=app_spec.flow_servers,
image_spec=app_spec.image_spec,
cluster_id=app_config.cluster_id,
cluster_id=cluster_id,
network_config=network_configs,
works=works,
local_source=True,
Expand All @@ -426,14 +424,13 @@ def dispatch(

# create / upload the new app release
lightning_app_release = self.backend.client.lightningapp_v2_service_create_lightningapp_release(
project_id=project.project_id, app_id=lit_app.id, body=release_body
project_id=project.project_id, app_id=app_id, body=release_body
)

if lightning_app_release.source_upload_url == "":
raise RuntimeError("The source upload url is empty.")

if getattr(lightning_app_release, "cluster_id", None):
app_config.cluster_id = lightning_app_release.cluster_id
logger.info(f"Running app on {lightning_app_release.cluster_id}")

# Save the config for re-runs
Expand All @@ -442,7 +439,7 @@ def dispatch(
repo.package()
repo.upload(url=lightning_app_release.source_upload_url)

if find_instances_resp.lightningapps:
if existing_instance is not None:
lightning_app_instance = (
self.backend.client.lightningapp_instance_service_update_lightningapp_instance_release(
project_id=project.project_id,
Expand All @@ -466,12 +463,12 @@ def dispatch(
lightning_app_instance = (
self.backend.client.lightningapp_v2_service_create_lightningapp_release_instance(
project_id=project.project_id,
app_id=lit_app.id,
app_id=app_id,
id=lightning_app_release.id,
body=Body9(
cluster_id=app_config.cluster_id,
cluster_id=cluster_id,
desired_state=app_release_desired_state,
name=lit_app.name,
name=app_name,
env=v1_env_vars,
queue_server_type=queue_server_type,
),
Expand Down Expand Up @@ -504,6 +501,36 @@ def _ensure_cluster_project_binding(self, project_id: str, cluster_id: str):
body=V1ProjectClusterBinding(cluster_id=cluster_id, project_id=project_id),
)

def _get_default_cluster(self, project_id: str) -> str:
"""This utility implements a minimal version of the cluster selection logic used in the cloud.
TODO: This should be requested directly from the platform.
"""
cluster_bindings = self.backend.client.projects_service_list_project_cluster_bindings(
project_id=project_id
).clusters

if not cluster_bindings:
raise ValueError(f"No clusters are bound to the project {project_id}.")

if len(cluster_bindings) == 1:
return cluster_bindings[0].cluster_id

clusters = [
self.backend.client.cluster_service_get_cluster(cluster_binding.cluster_id)
for cluster_binding in cluster_bindings
]

# Filter global clusters
clusters = [cluster for cluster in clusters if cluster.spec.cluster_type == V1ClusterType.GLOBAL]

return random.choice(clusters).id

@staticmethod
def _randomise_name(app_name: str) -> str:
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
return app_name + "-" + "".join(random.sample(letters, 4))

@staticmethod
def _check_uploaded_folder(root: Path, repo: LocalSourceCodeDir) -> None:
"""This method is used to inform the users if their folder files are large and how to filter them."""
Expand Down
5 changes: 3 additions & 2 deletions src/lightning_app/utilities/packaging/app_config.py
@@ -1,6 +1,6 @@
import pathlib
from dataclasses import asdict, dataclass, field
from typing import Optional, Union
from typing import Union

import yaml

Expand All @@ -18,7 +18,6 @@ class AppConfig:
"""

name: str = field(default_factory=get_unique_name)
cluster_id: Optional[str] = field(default=None)

def save_to_file(self, path: Union[str, pathlib.Path]) -> None:
"""Save the configuration to the given file in YAML format."""
Expand All @@ -35,6 +34,8 @@ def load_from_file(cls, path: Union[str, pathlib.Path]) -> "AppConfig":
"""Load the configuration from the given file."""
with open(path) as file:
config = yaml.safe_load(file)
# Ignore `cluster_id` without error for backwards compatibility.
config.pop("cluster_id", None)
return cls(**config)

@classmethod
Expand Down
7 changes: 3 additions & 4 deletions tests/tests_app/cli/test_cloud_cli.py
Expand Up @@ -11,7 +11,6 @@
from lightning_cloud.openapi import (
V1LightningappV2,
V1ListLightningappInstancesResponse,
V1ListLightningappsV2Response,
V1ListMembershipsResponse,
V1Membership,
)
Expand Down Expand Up @@ -102,8 +101,8 @@ def __init__(self, *args, create_response, **kwargs):
super().__init__()
self.create_response = create_response

def lightningapp_v2_service_list_lightningapps_v2(self, *args, **kwargs):
return V1ListLightningappsV2Response(lightningapps=[V1LightningappV2(id="my_app", name="app")])
def lightningapp_v2_service_create_lightningapp_v2(self, *args, **kwargs):
return V1LightningappV2(id="my_app", name="app")

def lightningapp_v2_service_create_lightningapp_release(self, project_id, app_id, body):
assert project_id == "test-project-id"
Expand Down Expand Up @@ -183,7 +182,7 @@ def __init__(self, *args, message, **kwargs):
super().__init__()
self.message = message

def lightningapp_v2_service_list_lightningapps_v2(self, *args, **kwargs):
def lightningapp_instance_service_list_lightningapp_instances(self, *args, **kwargs):
raise ApiException(
http_resp=HttpHeaderDict(
data=self.message,
Expand Down

0 comments on commit d3a7226

Please sign in to comment.