Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[App] Resolve some bugs from the Training Studio scaling #16114

Merged
merged 40 commits into from Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/lightning_app/core/app.py
Expand Up @@ -184,7 +184,8 @@ def __init__(
def _update_index_file(self):
# update index.html,
# this should happen once for all apps before the ui server starts running.
frontend.update_index_file(FRONTEND_DIR, info=self.info, root_path=self.root_path)
if self.root_path:
tchaton marked this conversation as resolved.
Show resolved Hide resolved
frontend.update_index_file(FRONTEND_DIR, info=self.info, root_path=self.root_path)

def get_component_by_name(self, component_name: str):
"""Returns the instance corresponding to the given component name."""
Expand Down Expand Up @@ -353,6 +354,10 @@ def _collect_deltas_from_ui_and_work_queues(self) -> List[Union[Delta, _APIReque
deltas.append(delta)
else:
api_or_command_request_deltas.append(delta)
else:
lantiga marked this conversation as resolved.
Show resolved Hide resolved
break

logger.info(f"Collected {len(deltas)} deltas in {time() - t0}")

if api_or_command_request_deltas:
_process_requests(self, api_or_command_request_deltas)
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/core/constants.py
Expand Up @@ -13,7 +13,7 @@ def get_lightning_cloud_url() -> str:

SUPPORTED_PRIMITIVE_TYPES = (type(None), str, int, float, bool)
STATE_UPDATE_TIMEOUT = 0.001
STATE_ACCUMULATE_WAIT = 0.05
STATE_ACCUMULATE_WAIT = 0.15
# Duration in seconds of a moving average of a full flow execution
# beyond which an exception is raised.
FLOW_DURATION_THRESHOLD = 1.0
Expand Down
3 changes: 1 addition & 2 deletions src/lightning_app/core/queues.py
Expand Up @@ -364,12 +364,11 @@ def get(self, timeout: int = None) -> Any:

# timeout is some value - loop until the timeout is reached
start_time = time.time()
timeout += 0.1 # add 0.1 seconds as a safe margin
while (time.time() - start_time) < timeout:
try:
return self._get()
except queue.Empty:
time.sleep(HTTP_QUEUE_REFRESH_INTERVAL)
time.sleep(0.1)

def _get(self):
resp = self.client.post(f"v1/{self.app_id}/{self._name_suffix}", query_params={"action": "pop"})
Expand Down
10 changes: 8 additions & 2 deletions src/lightning_app/utilities/packaging/cloud_compute.py
Expand Up @@ -69,9 +69,9 @@ class CloudCompute:
"""

name: str = "default"
disk_size: int = 0
disk_size: int = 15
idle_timeout: Optional[int] = None
shm_size: Optional[int] = 0
shm_size: Optional[int] = None
mounts: Optional[Union[Mount, List[Mount]]] = None
_internal_id: Optional[str] = None

Expand All @@ -80,6 +80,12 @@ def __post_init__(self) -> None:

self.name = self.name.lower()

if self.shm_size is None:
lantiga marked this conversation as resolved.
Show resolved Hide resolved
if "gpu" in self.name:
self.shm_size = 1024
else:
self.shm_size = 0
tchaton marked this conversation as resolved.
Show resolved Hide resolved

# All `default` CloudCompute are identified in the same way.
if self._internal_id is None:
self._internal_id = self._generate_id()
Expand Down
15 changes: 9 additions & 6 deletions src/lightning_app/utilities/scheduler.py
@@ -1,10 +1,9 @@
import threading
from copy import deepcopy
from datetime import datetime
from typing import Optional

from croniter import croniter
from deepdiff import DeepDiff, Delta
from deepdiff import Delta

from lightning_app.utilities.proxies import ComponentDelta

Expand Down Expand Up @@ -34,11 +33,15 @@ def run_once(self):
next_event = croniter(metadata["cron_pattern"], start_time).get_next(datetime)
# When the event is reached, send a delta to activate scheduling.
if current_date > next_event:
flow = self._app.get_component_by_name(metadata["name"])
previous_state = deepcopy(flow.state)
flow._enable_schedule(call_hash)
component_delta = ComponentDelta(
id=flow.name, delta=Delta(DeepDiff(previous_state, flow.state, verbose_level=2))
id=metadata["name"],
tchaton marked this conversation as resolved.
Show resolved Hide resolved
delta=Delta(
{
"values_changed": {
f"root['calls']['scheduling']['{call_hash}']['running']": {"new_value": True}
}
}
),
)
self._app.delta_queue.put(component_delta)
metadata["start_time"] = next_event.isoformat()
Expand Down
2 changes: 1 addition & 1 deletion tests/tests_app/core/test_lightning_flow.py
Expand Up @@ -655,7 +655,7 @@ def run(self):
def test_scheduling_api():

app = LightningApp(FlowSchedule())
MultiProcessRuntime(app, start_server=True).dispatch()
MultiProcessRuntime(app, start_server=False).dispatch()


def test_lightning_flow():
Expand Down