Skip to content

Commit

Permalink
[App] Add ready property to the flow (#15921)
Browse files Browse the repository at this point in the history
  • Loading branch information
tchaton committed Dec 6, 2022
1 parent 77006a2 commit 852089e
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 17 deletions.
6 changes: 6 additions & 0 deletions examples/app_boring/app_dynamic.py
Expand Up @@ -37,6 +37,12 @@ def __init__(self):
super().__init__()
self.dict = Dict()

@property
def ready(self) -> bool:
if "dst_w" in self.dict:
return self.dict["dst_w"].url != ""
return False

def run(self):
# create dynamically the source_work at runtime
if "src_w" not in self.dict:
Expand Down
3 changes: 3 additions & 0 deletions src/lightning_app/CHANGELOG.md
Expand Up @@ -12,10 +12,13 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Added the CLI command `lightning delete app` to delete a lightning app on the cloud ([#15783](https://github.com/Lightning-AI/lightning/pull/15783))

- Show a message when `BuildConfig(requirements=[...])` is passed but a `requirements.txt` file is already present in the Work ([#15799](https://github.com/Lightning-AI/lightning/pull/15799))

- Show a message when `BuildConfig(dockerfile="...")` is passed but a `Dockerfile` file is already present in the Work ([#15799](https://github.com/Lightning-AI/lightning/pull/15799))

- Added a CloudMultiProcessBackend which enables running a child App from within the Flow in the cloud ([#15800](https://github.com/Lightning-AI/lightning/pull/15800))

- Added the property `ready` of the LightningFlow to inform when the `Open App` should be visible ([#15921](https://github.com/Lightning-AI/lightning/pull/15921))


### Changed

Expand Down
14 changes: 9 additions & 5 deletions src/lightning_app/core/app.py
Expand Up @@ -142,6 +142,9 @@ def __init__(
self.exception = None
self.collect_changes: bool = True

# TODO: Enable ready locally for opening the UI.
self.ready = False

# NOTE: Checkpointing is disabled by default for the time being. We
# will enable it when resuming from full checkpoint is supported. Also,
# we will need to revisit the logic at _should_snapshot, since right now
Expand Down Expand Up @@ -446,6 +449,9 @@ def run_once(self):
done = True
self.stage = AppStage.STOPPING

if not self.ready:
self.ready = self.root.ready

self._last_run_time = time() - t0

self.on_run_once_end()
Expand Down Expand Up @@ -480,13 +486,11 @@ def _run(self) -> bool:
"""
self._original_state = deepcopy(self.state)
done = False
self.ready = self.root.ready

self._start_with_flow_works()

if self.should_publish_changes_to_api and self.api_publish_state_queue:
logger.debug("Publishing the state with changes")
# Push two states to optimize start in the cloud.
self.api_publish_state_queue.put(self.state_vars)
if self.ready and self.should_publish_changes_to_api and self.api_publish_state_queue:
self.api_publish_state_queue.put(self.state_vars)

self._reset_run_time_monitor()
Expand All @@ -496,7 +500,7 @@ def _run(self) -> bool:

self._update_run_time_monitor()

if self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue:
if self.ready and self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue:
self.api_publish_state_queue.put(self.state_vars)

self._has_updated = False
Expand Down
6 changes: 6 additions & 0 deletions src/lightning_app/core/flow.py
Expand Up @@ -230,6 +230,12 @@ def __getattr__(self, item):
return Path.from_dict(self._paths[item])
return self.__getattribute__(item)

@property
def ready(self) -> bool:
"""Override to customize when your App should be ready."""
flows = self.flows
return all(flow.ready for flow in flows.values()) if flows else True

@property
def changes(self):
return self._changes.copy()
Expand Down
12 changes: 1 addition & 11 deletions src/lightning_app/testing/testing.py
Expand Up @@ -362,17 +362,7 @@ def run_app_in_cloud(
except playwright._impl._api_types.TimeoutError:
print("'Create Project' dialog not visible, skipping.")

admin_page.locator(f"role=link[name='{name}']").click()
sleep(5)
# Scroll to the bottom of the page. Used to capture all logs.
admin_page.evaluate(
"""
var intervalID = setInterval(function () {
var scrollingElement = (document.scrollingElement || document.body);
scrollingElement.scrollTop = scrollingElement.scrollHeight;
}, 200);
"""
)
admin_page.locator(f'[data-cy="{name}"]').click()

client = LightningClient()
project = _get_project(client)
Expand Down
46 changes: 45 additions & 1 deletion tests/tests_app/core/test_lightning_flow.py
Expand Up @@ -3,8 +3,9 @@
from collections import Counter
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from time import time
from unittest.mock import ANY
from unittest.mock import ANY, MagicMock

import pytest
from deepdiff import DeepDiff, Delta
Expand Down Expand Up @@ -859,3 +860,46 @@ def test_lightning_flow_flows_and_works():
"root.flows_dict.a.w",
"root.flows_list.0.w",
]


class WorkReady(LightningWork):
def __init__(self):
super().__init__(parallel=True)
self.counter = 0

def run(self):
self.counter += 1


class FlowReady(LightningFlow):
def __init__(self):
super().__init__()
self.w = WorkReady()

@property
def ready(self) -> bool:
return self.w.has_succeeded

def run(self):
self.w.run()

if self.ready:
self._exit()


def test_flow_ready():
"""This test validates the api publish state queue is populated only once ready is True."""

def run_patch(method):
app.api_publish_state_queue = MagicMock()
app.should_publish_changes_to_api = False
method()

app = LightningApp(FlowReady())
app._run = partial(run_patch, method=app._run)
MultiProcessRuntime(app, start_server=False).dispatch()

# Validates the state has been added only when ready was true.
state = app.api_publish_state_queue.put._mock_call_args[0][0]
call_hash = state["works"]["w"]["calls"]["latest_call_hash"]
assert state["works"]["w"]["calls"][call_hash]["statuses"][0]["stage"] == "succeeded"

0 comments on commit 852089e

Please sign in to comment.