diff --git a/examples/app_boring/app_dynamic.py b/examples/app_boring/app_dynamic.py index cfd303505a0ed..5edb1f2898012 100644 --- a/examples/app_boring/app_dynamic.py +++ b/examples/app_boring/app_dynamic.py @@ -37,6 +37,12 @@ def __init__(self): super().__init__() self.dict = Dict() + @property + def ready(self) -> bool: + if "dst_w" in self.dict: + return self.dict["dst_w"].url != "" + return False + def run(self): # create dynamically the source_work at runtime if "src_w" not in self.dict: diff --git a/src/lightning_app/CHANGELOG.md b/src/lightning_app/CHANGELOG.md index b3ab015ce44bc..ec7812e6e7417 100644 --- a/src/lightning_app/CHANGELOG.md +++ b/src/lightning_app/CHANGELOG.md @@ -12,10 +12,13 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Added the CLI command `lightning delete app` to delete a lightning app on the cloud ([#15783](https://github.com/Lightning-AI/lightning/pull/15783)) - Show a message when `BuildConfig(requirements=[...])` is passed but a `requirements.txt` file is already present in the Work ([#15799](https://github.com/Lightning-AI/lightning/pull/15799)) + - Show a message when `BuildConfig(dockerfile="...")` is passed but a `Dockerfile` file is already present in the Work ([#15799](https://github.com/Lightning-AI/lightning/pull/15799)) - Added a CloudMultiProcessBackend which enables running a child App from within the Flow in the cloud ([#15800](https://github.com/Lightning-AI/lightning/pull/15800)) +- Added the property `ready` of the LightningFlow to inform when the `Open App` should be visible ([#15921](https://github.com/Lightning-AI/lightning/pull/15921)) + ### Changed diff --git a/src/lightning_app/core/app.py b/src/lightning_app/core/app.py index 377a6a0d9e220..42cf0f241b47e 100644 --- a/src/lightning_app/core/app.py +++ b/src/lightning_app/core/app.py @@ -142,6 +142,9 @@ def __init__( self.exception = None self.collect_changes: bool = True + # TODO: Enable ready locally for opening the UI. + self.ready = False + # NOTE: Checkpointing is disabled by default for the time being. We # will enable it when resuming from full checkpoint is supported. Also, # we will need to revisit the logic at _should_snapshot, since right now @@ -446,6 +449,9 @@ def run_once(self): done = True self.stage = AppStage.STOPPING + if not self.ready: + self.ready = self.root.ready + self._last_run_time = time() - t0 self.on_run_once_end() @@ -480,13 +486,11 @@ def _run(self) -> bool: """ self._original_state = deepcopy(self.state) done = False + self.ready = self.root.ready self._start_with_flow_works() - if self.should_publish_changes_to_api and self.api_publish_state_queue: - logger.debug("Publishing the state with changes") - # Push two states to optimize start in the cloud. - self.api_publish_state_queue.put(self.state_vars) + if self.ready and self.should_publish_changes_to_api and self.api_publish_state_queue: self.api_publish_state_queue.put(self.state_vars) self._reset_run_time_monitor() @@ -496,7 +500,7 @@ def _run(self) -> bool: self._update_run_time_monitor() - if self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue: + if self.ready and self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue: self.api_publish_state_queue.put(self.state_vars) self._has_updated = False diff --git a/src/lightning_app/core/flow.py b/src/lightning_app/core/flow.py index 3773c7a0dd4f0..72527bf7aee6f 100644 --- a/src/lightning_app/core/flow.py +++ b/src/lightning_app/core/flow.py @@ -230,6 +230,12 @@ def __getattr__(self, item): return Path.from_dict(self._paths[item]) return self.__getattribute__(item) + @property + def ready(self) -> bool: + """Override to customize when your App should be ready.""" + flows = self.flows + return all(flow.ready for flow in flows.values()) if flows else True + @property def changes(self): return self._changes.copy() diff --git a/src/lightning_app/testing/testing.py b/src/lightning_app/testing/testing.py index 3e09012ac431c..8d112d7fa4a7a 100644 --- a/src/lightning_app/testing/testing.py +++ b/src/lightning_app/testing/testing.py @@ -362,17 +362,7 @@ def run_app_in_cloud( except playwright._impl._api_types.TimeoutError: print("'Create Project' dialog not visible, skipping.") - admin_page.locator(f"role=link[name='{name}']").click() - sleep(5) - # Scroll to the bottom of the page. Used to capture all logs. - admin_page.evaluate( - """ - var intervalID = setInterval(function () { - var scrollingElement = (document.scrollingElement || document.body); - scrollingElement.scrollTop = scrollingElement.scrollHeight; - }, 200); - """ - ) + admin_page.locator(f'[data-cy="{name}"]').click() client = LightningClient() project = _get_project(client) diff --git a/tests/tests_app/core/test_lightning_flow.py b/tests/tests_app/core/test_lightning_flow.py index c0cf780dc5eff..ed668c12e9b1b 100644 --- a/tests/tests_app/core/test_lightning_flow.py +++ b/tests/tests_app/core/test_lightning_flow.py @@ -3,8 +3,9 @@ from collections import Counter from copy import deepcopy from dataclasses import dataclass +from functools import partial from time import time -from unittest.mock import ANY +from unittest.mock import ANY, MagicMock import pytest from deepdiff import DeepDiff, Delta @@ -859,3 +860,46 @@ def test_lightning_flow_flows_and_works(): "root.flows_dict.a.w", "root.flows_list.0.w", ] + + +class WorkReady(LightningWork): + def __init__(self): + super().__init__(parallel=True) + self.counter = 0 + + def run(self): + self.counter += 1 + + +class FlowReady(LightningFlow): + def __init__(self): + super().__init__() + self.w = WorkReady() + + @property + def ready(self) -> bool: + return self.w.has_succeeded + + def run(self): + self.w.run() + + if self.ready: + self._exit() + + +def test_flow_ready(): + """This test validates the api publish state queue is populated only once ready is True.""" + + def run_patch(method): + app.api_publish_state_queue = MagicMock() + app.should_publish_changes_to_api = False + method() + + app = LightningApp(FlowReady()) + app._run = partial(run_patch, method=app._run) + MultiProcessRuntime(app, start_server=False).dispatch() + + # Validates the state has been added only when ready was true. + state = app.api_publish_state_queue.put._mock_call_args[0][0] + call_hash = state["works"]["w"]["calls"]["latest_call_hash"] + assert state["works"]["w"]["calls"][call_hash]["statuses"][0]["stage"] == "succeeded"