-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
cloud_compute.py
147 lines (114 loc) · 5.32 KB
/
cloud_compute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from dataclasses import asdict, dataclass
from typing import Dict, List, Optional, Tuple, Union
from uuid import uuid4
from lightning_app.core.constants import ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER
from lightning_app.storage.mount import Mount
__CLOUD_COMPUTE_IDENTIFIER__ = "__cloud_compute__"
@dataclass
class _CloudComputeStore:
id: str
component_names: List[str]
def add_component_name(self, new_component_name: str) -> None:
found_index = None
# When the work is being named by the flow, pop its previous names
for index, component_name in enumerate(self.component_names):
if new_component_name.endswith(component_name.replace("root.", "")):
found_index = index
if found_index is not None:
self.component_names[found_index] = new_component_name
else:
if (
len(self.component_names) == 1
and not ENABLE_MULTIPLE_WORKS_IN_NON_DEFAULT_CONTAINER
and self.id != "default"
):
raise Exception(
f"A Cloud Compute can be assigned only to a single Work. Attached to {self.component_names[0]}"
)
self.component_names.append(new_component_name)
def remove(self, new_component_name: str) -> None:
found_index = None
for index, component_name in enumerate(self.component_names):
if new_component_name == component_name:
found_index = index
if found_index is not None:
del self.component_names[found_index]
_CLOUD_COMPUTE_STORE = {}
@dataclass
class CloudCompute:
"""Configure the cloud runtime for a lightning work or flow.
Arguments:
name: The name of the hardware to use. A full list of supported options can be found in
:doc:`/core_api/lightning_work/compute`. If you have a request for more hardware options, please contact
`onprem@lightning.ai <mailto:onprem@lightning.ai>`_.
disk_size: The disk size in Gigabytes.
The value you set here will be allocated to the /home folder.
idle_timeout: The number of seconds to wait before pausing the compute when the work is running and idle.
This timeout starts whenever your run() method succeeds (or fails).
If the timeout is reached, the instance pauses until the next run() call happens.
shm_size: Shared memory size in MiB, backed by RAM. min 512, max 8192, it will auto update in steps of 512.
For example 1100 will become 1024. If set to zero (the default) will get the default 64MiB inside docker.
mounts: External data sources which should be mounted into a work as a filesystem at runtime.
"""
name: str = "default"
disk_size: int = 0
idle_timeout: Optional[int] = None
shm_size: Optional[int] = None
mounts: Optional[Union[Mount, List[Mount]]] = None
_internal_id: Optional[str] = None
def __post_init__(self) -> None:
_verify_mount_root_dirs_are_unique(self.mounts)
self.name = self.name.lower()
if self.shm_size is None:
if "gpu" in self.name:
self.shm_size = 1024
else:
self.shm_size = 0
# All `default` CloudCompute are identified in the same way.
if self._internal_id is None:
self._internal_id = self._generate_id()
# Internal arguments for now.
self.preemptible = False
def to_dict(self) -> dict:
_verify_mount_root_dirs_are_unique(self.mounts)
return {"type": __CLOUD_COMPUTE_IDENTIFIER__, **asdict(self)}
@classmethod
def from_dict(cls, d: dict) -> "CloudCompute":
assert d.pop("type") == __CLOUD_COMPUTE_IDENTIFIER__
mounts = d.pop("mounts", None)
if mounts is None:
pass
elif isinstance(mounts, dict):
d["mounts"] = Mount(**mounts)
elif isinstance(mounts, (list)):
d["mounts"] = []
for mount in mounts:
d["mounts"].append(Mount(**mount))
else:
raise TypeError(
f"mounts argument must be one of [None, Mount, List[Mount]], "
f"received {mounts} of type {type(mounts)}"
)
_verify_mount_root_dirs_are_unique(d.get("mounts", None))
return cls(**d)
@property
def id(self) -> Optional[str]:
return self._internal_id
def is_default(self) -> bool:
return self.name == "default"
def _generate_id(self):
return "default" if self.name == "default" else uuid4().hex[:7]
def clone(self):
new_dict = self.to_dict()
new_dict["_internal_id"] = self._generate_id()
return self.from_dict(new_dict)
def _verify_mount_root_dirs_are_unique(mounts: Union[None, Mount, List[Mount], Tuple[Mount]]) -> None:
if isinstance(mounts, (list, tuple, set)):
mount_paths = [mount.mount_path for mount in mounts]
if len(set(mount_paths)) != len(mount_paths):
raise ValueError("Every Mount attached to a work must have a unique 'mount_path' argument.")
def _maybe_create_cloud_compute(state: Dict) -> Union[CloudCompute, Dict]:
if state and __CLOUD_COMPUTE_IDENTIFIER__ == state.get("type", None):
cloud_compute = CloudCompute.from_dict(state)
return cloud_compute
return state