diff --git a/examples/app_boring/app.py b/examples/app_boring/app.py index aad288a11acb4..78a9b1c819f06 100644 --- a/examples/app_boring/app.py +++ b/examples/app_boring/app.py @@ -43,6 +43,10 @@ def __init__(self): raise_exception=True, ) + @property + def ready(self) -> bool: + return self.dest_work.is_running + def run(self): self.source_work.run() if self.source_work.has_succeeded: diff --git a/src/lightning_app/components/serve/streamlit.py b/src/lightning_app/components/serve/streamlit.py index 9b943a1708fa3..720139f93f25e 100644 --- a/src/lightning_app/components/serve/streamlit.py +++ b/src/lightning_app/components/serve/streamlit.py @@ -20,6 +20,8 @@ class ServeStreamlit(LightningWork, abc.ABC): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.ready = False + self._process = None @property @@ -58,6 +60,7 @@ def run(self) -> None: ], env=env, ) + self.ready = True self._process.wait() def on_exit(self) -> None: diff --git a/src/lightning_app/core/api.py b/src/lightning_app/core/api.py index e6f7b6ad0024c..4a439fa87bd82 100644 --- a/src/lightning_app/core/api.py +++ b/src/lightning_app/core/api.py @@ -34,6 +34,7 @@ from lightning_app.core.queues import QueuingSystem from lightning_app.storage import Drive from lightning_app.utilities.app_helpers import InMemoryStateStore, Logger, StateStore +from lightning_app.utilities.app_status import AppStatus from lightning_app.utilities.cloud import is_running_in_cloud from lightning_app.utilities.component import _context from lightning_app.utilities.enum import ComponentContext, OpenAPITags @@ -66,18 +67,24 @@ class SessionMiddleware: lock = Lock() app_spec: Optional[List] = None +app_status: Optional[AppStatus] = None + # In the future, this would be abstracted to support horizontal scaling. responses_store = {} logger = Logger(__name__) - # This can be replaced with a consumer that publishes states in a kv-store # in a serverless architecture class UIRefresher(Thread): - def __init__(self, api_publish_state_queue, api_response_queue, refresh_interval: float = 0.1) -> None: + def __init__( + self, + api_publish_state_queue, + api_response_queue, + refresh_interval: float = 0.1, + ) -> None: super().__init__(daemon=True) self.api_publish_state_queue = api_publish_state_queue self.api_response_queue = api_response_queue @@ -98,7 +105,8 @@ def run(self): def run_once(self): try: - state = self.api_publish_state_queue.get(timeout=0) + global app_status + state, app_status = self.api_publish_state_queue.get(timeout=0) with lock: global_app_state_store.set_app_state(TEST_SESSION_UUID, state) except queue.Empty: @@ -326,6 +334,17 @@ async def upload_file(response: Response, filename: str, uploaded_file: UploadFi return f"Successfully uploaded '{filename}' to the Drive" +@fastapi_service.get("/api/v1/status", response_model=AppStatus) +async def get_status() -> AppStatus: + """Get the current status of the app and works.""" + global app_status + if app_status is None: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="App status hasn't been reported yet." + ) + return app_status + + @fastapi_service.get("/healthz", status_code=200) async def healthz(response: Response): """Health check endpoint used in the cloud FastAPI servers to check the status periodically.""" diff --git a/src/lightning_app/core/app.py b/src/lightning_app/core/app.py index 9c3aeeb650de0..dfce2097ec5a0 100644 --- a/src/lightning_app/core/app.py +++ b/src/lightning_app/core/app.py @@ -35,6 +35,7 @@ _should_dispatch_app, Logger, ) +from lightning_app.utilities.app_status import AppStatus from lightning_app.utilities.commands.base import _process_requests from lightning_app.utilities.component import _convert_paths_after_init, _validate_root_flow from lightning_app.utilities.enum import AppStage, CacheCallsKeys @@ -140,6 +141,7 @@ def __init__( self.exception = None self.collect_changes: bool = True + self.status: Optional[AppStatus] = None # TODO: Enable ready locally for opening the UI. self.ready = False @@ -150,6 +152,7 @@ def __init__( self.checkpointing: bool = False self._update_layout() + self._update_status() self.is_headless: Optional[bool] = None @@ -418,6 +421,7 @@ def run_once(self): self._update_layout() self._update_is_headless() + self._update_status() self.maybe_apply_changes() if self.checkpointing and self._should_snapshot(): @@ -485,19 +489,12 @@ def _run(self) -> bool: self._original_state = deepcopy(self.state) done = False - # TODO: Re-enable the `ready` property once issues are resolved - if not self.root.ready: - warnings.warn( - "One of your Flows returned `.ready` as `False`. " - "This feature is not yet enabled so this will be ignored.", - UserWarning, - ) - self.ready = True + self.ready = self.root.ready self._start_with_flow_works() - if self.ready and self.should_publish_changes_to_api and self.api_publish_state_queue: - self.api_publish_state_queue.put(self.state_vars) + if self.should_publish_changes_to_api and self.api_publish_state_queue is not None: + self.api_publish_state_queue.put((self.state_vars, self.status)) self._reset_run_time_monitor() @@ -506,8 +503,8 @@ def _run(self) -> bool: self._update_run_time_monitor() - if self.ready and self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue: - self.api_publish_state_queue.put(self.state_vars) + if self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue is not None: + self.api_publish_state_queue.put((self.state_vars, self.status)) self._has_updated = False @@ -532,6 +529,23 @@ def _update_is_headless(self) -> None: # This ensures support for apps which dynamically add a UI at runtime. _handle_is_headless(self) + def _update_status(self) -> None: + old_status = self.status + + work_statuses = {} + for work in breadth_first(self.root, types=(lightning_app.LightningWork,)): + work_statuses[work.name] = work.status + + self.status = AppStatus( + is_ui_ready=self.ready, + work_statuses=work_statuses, + ) + + # If the work statuses changed, the state delta will trigger an update. + # If ready has changed, we trigger an update manually. + if self.status != old_status: + self._has_updated = True + def _apply_restarting(self) -> bool: self._reset_original_state() # apply stage after restoring the original state. diff --git a/src/lightning_app/core/flow.py b/src/lightning_app/core/flow.py index 0be6b6f8ade98..5987425713489 100644 --- a/src/lightning_app/core/flow.py +++ b/src/lightning_app/core/flow.py @@ -249,10 +249,7 @@ def __getattr__(self, item): @property def ready(self) -> bool: - """Not currently enabled. - - Override to customize when your App should be ready. - """ + """Override to customize when your App should be ready.""" flows = self.flows return all(flow.ready for flow in flows.values()) if flows else True diff --git a/src/lightning_app/core/work.py b/src/lightning_app/core/work.py index b34f42f2fef9a..d99df13d4f933 100644 --- a/src/lightning_app/core/work.py +++ b/src/lightning_app/core/work.py @@ -12,13 +12,13 @@ from lightning_app.storage.drive import _maybe_create_drive, Drive from lightning_app.storage.payload import Payload from lightning_app.utilities.app_helpers import _is_json_serializable, _LightningAppRef, is_overridden +from lightning_app.utilities.app_status import WorkStatus from lightning_app.utilities.component import _is_flow_context, _sanitize_state from lightning_app.utilities.enum import ( CacheCallsKeys, make_status, WorkFailureReasons, WorkStageStatus, - WorkStatus, WorkStopReasons, ) from lightning_app.utilities.exceptions import LightningWorkException diff --git a/src/lightning_app/runners/runtime.py b/src/lightning_app/runners/runtime.py index a30b78f9178a0..c6d8c3d4394b9 100644 --- a/src/lightning_app/runners/runtime.py +++ b/src/lightning_app/runners/runtime.py @@ -121,7 +121,7 @@ def terminate(self) -> None: self._add_stopped_status_to_work(work) # Publish the updated state and wait for the frontend to update. - self.app.api_publish_state_queue.put(self.app.state) + self.app.api_publish_state_queue.put((self.app.state, self.app.status)) for thread in self.threads + self.app.threads: thread.join(timeout=0) diff --git a/src/lightning_app/utilities/app_status.py b/src/lightning_app/utilities/app_status.py new file mode 100644 index 0000000000000..232c3f0b65210 --- /dev/null +++ b/src/lightning_app/utilities/app_status.py @@ -0,0 +1,29 @@ +from datetime import datetime +from typing import Any, Dict, Optional + +from pydantic import BaseModel + + +class WorkStatus(BaseModel): + """The ``WorkStatus`` captures the status of a work according to the app.""" + + stage: str + timestamp: float + reason: Optional[str] = None + message: Optional[str] = None + count: int = 1 + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + assert self.timestamp > 0 and self.timestamp < (int(datetime.now().timestamp()) + 10) + + +class AppStatus(BaseModel): + """The ``AppStatus`` captures the current status of the app and its components.""" + + # ``True`` when the app UI is ready to be viewed + is_ui_ready: bool + + # The statuses of ``LightningWork`` objects currently associated with this app + work_statuses: Dict[str, WorkStatus] diff --git a/src/lightning_app/utilities/enum.py b/src/lightning_app/utilities/enum.py index 11cd7fabc4299..4c92ffba3db11 100644 --- a/src/lightning_app/utilities/enum.py +++ b/src/lightning_app/utilities/enum.py @@ -1,5 +1,4 @@ import enum -from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional @@ -47,18 +46,6 @@ class WorkStageStatus: FAILED = "failed" -@dataclass -class WorkStatus: - stage: WorkStageStatus - timestamp: float - reason: Optional[str] = None - message: Optional[str] = None - count: int = 1 - - def __post_init__(self): - assert self.timestamp > 0 and self.timestamp < (int(datetime.now().timestamp()) + 10) - - def make_status(stage: str, message: Optional[str] = None, reason: Optional[str] = None): status = { "stage": stage, diff --git a/tests/tests_app/core/test_lightning_api.py b/tests/tests_app/core/test_lightning_api.py index f3eb8f9bacda9..adad9fba932e0 100644 --- a/tests/tests_app/core/test_lightning_api.py +++ b/tests/tests_app/core/test_lightning_api.py @@ -31,6 +31,7 @@ from lightning_app.runners import MultiProcessRuntime from lightning_app.storage.drive import Drive from lightning_app.testing.helpers import _MockQueue +from lightning_app.utilities.app_status import AppStatus from lightning_app.utilities.component import _set_frontend_context, _set_work_context from lightning_app.utilities.enum import AppStage from lightning_app.utilities.load_app import extract_metadata_from_app @@ -195,7 +196,7 @@ def test_update_publish_state_and_maybe_refresh_ui(): publish_state_queue = _MockQueue("publish_state_queue") api_response_queue = _MockQueue("api_response_queue") - publish_state_queue.put(app.state_with_changes) + publish_state_queue.put((app.state_with_changes, None)) thread = UIRefresher(publish_state_queue, api_response_queue) thread.run_once() @@ -226,7 +227,7 @@ def get(self, timeout: int = 0): has_started_queue = _MockQueue("has_started_queue") api_response_queue = _MockQueue("api_response_queue") state = app.state_with_changes - publish_state_queue.put(state) + publish_state_queue.put((state, AppStatus(is_ui_ready=True, work_statuses={}))) spec = extract_metadata_from_app(app) ui_refresher = start_server( publish_state_queue, @@ -284,6 +285,9 @@ def get(self, timeout: int = 0): {"name": "main_4", "content": "https://te"}, ] + response = await client.get("/api/v1/status") + assert response.json() == {"is_ui_ready": True, "work_statuses": {}} + response = await client.post("/api/v1/state", json={"state": new_state}, headers=headers) assert change_state_queue._queue[1].to_dict() == { "values_changed": {"root['vars']['counter']": {"new_value": 1}} diff --git a/tests/tests_app/core/test_lightning_flow.py b/tests/tests_app/core/test_lightning_flow.py index c2aa52b8e6189..6aad7a9ee510b 100644 --- a/tests/tests_app/core/test_lightning_flow.py +++ b/tests/tests_app/core/test_lightning_flow.py @@ -5,7 +5,7 @@ from dataclasses import dataclass from functools import partial from time import time -from unittest.mock import ANY, MagicMock +from unittest.mock import ANY import pytest from deepdiff import DeepDiff, Delta @@ -19,7 +19,7 @@ from lightning_app.storage.path import _storage_root_dir from lightning_app.structures import Dict as LDict from lightning_app.structures import List as LList -from lightning_app.testing.helpers import EmptyFlow, EmptyWork +from lightning_app.testing.helpers import _MockQueue, EmptyFlow, EmptyWork from lightning_app.utilities.app_helpers import ( _delta_to_app_state_delta, _LightningAppRef, @@ -891,21 +891,37 @@ def run(self): def test_flow_ready(): - """This test validates the api publish state queue is populated only once ready is True.""" + """This test validates that the app status queue is populated correctly.""" + + mock_queue = _MockQueue("api_publish_state_queue") def run_patch(method): - app.api_publish_state_queue = MagicMock() - app.should_publish_changes_to_api = False + app.should_publish_changes_to_api = True + app.api_publish_state_queue = mock_queue method() + state = {"done": False} + + def lagged_run_once(method): + """Ensure that the full loop is run after the app exits.""" + new_done = method() + if state["done"]: + return True + state["done"] = new_done + return False + app = LightningApp(FlowReady()) app._run = partial(run_patch, method=app._run) + app.run_once = partial(lagged_run_once, method=app.run_once) MultiProcessRuntime(app, start_server=False).dispatch() - # Validates the state has been added only when ready was true. - state = app.api_publish_state_queue.put._mock_call_args[0][0] - call_hash = state["works"]["w"]["calls"]["latest_call_hash"] - assert state["works"]["w"]["calls"][call_hash]["statuses"][0]["stage"] == "succeeded" + _, first_status = mock_queue.get() + assert not first_status.is_ui_ready + + _, last_status = mock_queue.get() + while len(mock_queue) > 0: + _, last_status = mock_queue.get() + assert last_status.is_ui_ready def test_structures_register_work_cloudcompute(): diff --git a/tests/tests_app/storage/test_orchestrator.py b/tests/tests_app/storage/test_orchestrator.py index ca671e6b93704..4b391a890f1a9 100644 --- a/tests/tests_app/storage/test_orchestrator.py +++ b/tests/tests_app/storage/test_orchestrator.py @@ -39,7 +39,7 @@ def test_orchestrator(): # orchestrator is now waiting for a response for copier in Work A assert "work_b" in orchestrator.waiting_for_response - assert not request_queues["work_a"] + assert len(request_queues["work_a"]) == 0 assert request in copy_request_queues["work_a"] assert request.destination == "work_b" @@ -54,7 +54,7 @@ def test_orchestrator(): # orchestrator processes confirmation and confirms to the pending request from Work B orchestrator.run_once("work_a") - assert not copy_response_queues["work_a"] + assert len(copy_response_queues["work_a"]) == 0 assert response in response_queues["work_b"] assert not orchestrator.waiting_for_response orchestrator.run_once("work_b") @@ -71,7 +71,7 @@ def test_orchestrator(): assert response.exception is None # all queues should be empty - assert all(not queue for queue in request_queues.values()) - assert all(not queue for queue in response_queues.values()) - assert all(not queue for queue in copy_request_queues.values()) - assert all(not queue for queue in copy_response_queues.values()) + assert all(len(queue) == 0 for queue in request_queues.values()) + assert all(len(queue) == 0 for queue in response_queues.values()) + assert all(len(queue) == 0 for queue in copy_request_queues.values()) + assert all(len(queue) == 0 for queue in copy_response_queues.values()) diff --git a/tests/tests_app/storage/test_path.py b/tests/tests_app/storage/test_path.py index 3cd501f7344c8..2310b8034c303 100644 --- a/tests/tests_app/storage/test_path.py +++ b/tests/tests_app/storage/test_path.py @@ -606,7 +606,7 @@ def test_path_response_not_matching_reqeuest(tmpdir): path.get() # simulate a response that has a different hash than the request had - assert not response_queue + assert len(response_queue) == 0 response.path = str(path) response.hash = "other_hash" response_queue.put(response) diff --git a/tests/tests_app/utilities/test_proxies.py b/tests/tests_app/utilities/test_proxies.py index 4b8a5f25f71e3..a53d8e85a3d37 100644 --- a/tests/tests_app/utilities/test_proxies.py +++ b/tests/tests_app/utilities/test_proxies.py @@ -250,6 +250,7 @@ def __call__(self): state = deepcopy(self.work.state) self.work._calls[call_hash]["statuses"].append( { + "name": self.work.name, "stage": WorkStageStatus.FAILED, "reason": WorkFailureReasons.TIMEOUT, "timestamp": time.time(), @@ -547,7 +548,7 @@ def run(self, use_setattr=False, use_containers=False): # 1. Simulate no state changes ############################## work.run(use_setattr=False, use_containers=False) - assert not delta_queue + assert len(delta_queue) == 0 ############################ # 2. Simulate a setattr call @@ -563,16 +564,16 @@ def run(self, use_setattr=False, use_containers=False): assert len(observer._delta_memory) == 1 # The observer should not trigger any deltas being sent and only consume the delta memory - assert not delta_queue + assert len(delta_queue) == 0 observer.run_once() - assert not delta_queue + assert len(delta_queue) == 0 assert not observer._delta_memory ################################ # 3. Simulate a container update ################################ work.run(use_setattr=False, use_containers=True) - assert not delta_queue + assert len(delta_queue) == 0 assert not observer._delta_memory observer.run_once() observer.run_once() # multiple runs should not affect how many deltas are sent unless there are changes @@ -591,7 +592,7 @@ def run(self, use_setattr=False, use_containers=False): delta = delta_queue.get().delta.to_dict() assert delta == {"values_changed": {"root['vars']['var']": {"new_value": 3}}} - assert not delta_queue + assert len(delta_queue) == 0 assert len(observer._delta_memory) == 1 observer.run_once() @@ -599,7 +600,7 @@ def run(self, use_setattr=False, use_containers=False): assert delta["values_changed"] == {"root['vars']['dict']['counter']": {"new_value": 2}} assert delta["iterable_item_added"] == {"root['vars']['list'][1]": 1} - assert not delta_queue + assert len(delta_queue) == 0 assert not observer._delta_memory