Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

[App] Add ready property to the flow #15921

Merged
merged 11 commits into from Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/app_boring/app_dynamic.py
Expand Up @@ -37,6 +37,12 @@ def __init__(self):
super().__init__()
self.dict = Dict()

@property
def ready(self) -> bool:
if "dst_w" in self.dict:
return self.dict["dst_w"].url != ""
return False

def run(self):
# create dynamically the source_work at runtime
if "src_w" not in self.dict:
Expand Down
3 changes: 3 additions & 0 deletions src/lightning_app/CHANGELOG.md
Expand Up @@ -12,10 +12,13 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Added the CLI command `lightning delete app` to delete a lightning app on the cloud ([#15783](https://github.com/Lightning-AI/lightning/pull/15783))

- Show a message when `BuildConfig(requirements=[...])` is passed but a `requirements.txt` file is already present in the Work ([#15799](https://github.com/Lightning-AI/lightning/pull/15799))

- Show a message when `BuildConfig(dockerfile="...")` is passed but a `Dockerfile` file is already present in the Work ([#15799](https://github.com/Lightning-AI/lightning/pull/15799))

- Added a CloudMultiProcessBackend which enables running a child App from within the Flow in the cloud ([#15800](https://github.com/Lightning-AI/lightning/pull/15800))

- Added the property `ready` of the LightningFlow to inform when the `Open App` should be visible ([#15921](https://github.com/Lightning-AI/lightning/pull/15921))


### Changed

Expand Down
14 changes: 9 additions & 5 deletions src/lightning_app/core/app.py
Expand Up @@ -142,6 +142,9 @@ def __init__(
self.exception = None
self.collect_changes: bool = True

# TODO: Enable ready locally for opening the UI.
self.ready = False
tchaton marked this conversation as resolved.
Show resolved Hide resolved

# NOTE: Checkpointing is disabled by default for the time being. We
# will enable it when resuming from full checkpoint is supported. Also,
# we will need to revisit the logic at _should_snapshot, since right now
Expand Down Expand Up @@ -446,6 +449,9 @@ def run_once(self):
done = True
self.stage = AppStage.STOPPING

if not self.ready:
self.ready = self.root.ready

self._last_run_time = time() - t0

self.on_run_once_end()
Expand Down Expand Up @@ -480,13 +486,11 @@ def _run(self) -> bool:
"""
self._original_state = deepcopy(self.state)
done = False
self.ready = self.root.ready

self._start_with_flow_works()

if self.should_publish_changes_to_api and self.api_publish_state_queue:
logger.debug("Publishing the state with changes")
# Push two states to optimize start in the cloud.
self.api_publish_state_queue.put(self.state_vars)
if self.ready and self.should_publish_changes_to_api and self.api_publish_state_queue:
self.api_publish_state_queue.put(self.state_vars)

self._reset_run_time_monitor()
Expand All @@ -496,7 +500,7 @@ def _run(self) -> bool:

self._update_run_time_monitor()

if self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue:
if self.ready and self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue:
self.api_publish_state_queue.put(self.state_vars)

self._has_updated = False
Expand Down
6 changes: 6 additions & 0 deletions src/lightning_app/core/flow.py
Expand Up @@ -230,6 +230,12 @@ def __getattr__(self, item):
return Path.from_dict(self._paths[item])
return self.__getattribute__(item)

@property
def ready(self) -> bool:
"""Override to customize when your App should be ready."""
tchaton marked this conversation as resolved.
Show resolved Hide resolved
flows = self.flows
return all(flow.ready for flow in flows.values()) if flows else True

@property
def changes(self):
return self._changes.copy()
Expand Down
46 changes: 45 additions & 1 deletion tests/tests_app/core/test_lightning_flow.py
Expand Up @@ -3,8 +3,9 @@
from collections import Counter
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from time import time
from unittest.mock import ANY
from unittest.mock import ANY, MagicMock

import pytest
from deepdiff import DeepDiff, Delta
Expand Down Expand Up @@ -859,3 +860,46 @@ def test_lightning_flow_flows_and_works():
"root.flows_dict.a.w",
"root.flows_list.0.w",
]


class WorkReady(LightningWork):
def __init__(self):
super().__init__(parallel=True)
self.counter = 0

def run(self):
self.counter += 1


class FlowReady(LightningFlow):
def __init__(self):
super().__init__()
self.w = WorkReady()

@property
def ready(self) -> bool:
return self.w.has_succeeded

def run(self):
self.w.run()

if self.ready:
self._exit()


def test_flow_ready():
"""This test validates the api publish state queue is populated only once ready is True."""

def run_patch(method):
app.api_publish_state_queue = MagicMock()
app.should_publish_changes_to_api = False
method()

app = LightningApp(FlowReady())
app._run = partial(run_patch, method=app._run)
MultiProcessRuntime(app, start_server=False).dispatch()

# Validates the state has been added only when ready was true.
state = app.api_publish_state_queue.put._mock_call_args[0][0]
call_hash = state["works"]["w"]["calls"]["latest_call_hash"]
assert state["works"]["w"]["calls"][call_hash]["statuses"][0]["stage"] == "succeeded"