From abff4c2601968512291ca9a1157530a10fc0307a Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Fri, 9 Dec 2022 12:28:24 +0000 Subject: [PATCH 01/17] Initial commit --- src/lightning_app/core/api.py | 41 ++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/src/lightning_app/core/api.py b/src/lightning_app/core/api.py index e6f7b6ad0024c..d6d6c48804473 100644 --- a/src/lightning_app/core/api.py +++ b/src/lightning_app/core/api.py @@ -71,7 +71,6 @@ class SessionMiddleware: logger = Logger(__name__) - # This can be replaced with a consumer that publishes states in a kv-store # in a serverless architecture @@ -123,6 +122,27 @@ class StateUpdate(BaseModel): state: dict = {} +class WorkStatus(BaseModel): + """The ``WorkStatus`` captures the status of a work according to the app.""" + + # The name of the work + name: str + + # ``True`` when the work is running according to the app. + # Compute states in the cloud are owned by the platform. + is_running: bool + + +class AppStatus(BaseModel): + """The ``AppStatus`` captures the current status of the app and its components.""" + + # ``True`` when the app UI is ready to be viewed + is_ui_ready: bool + + # The statuses of ``LightningWork`` objects currently associated with this app + work_statuses: List[WorkStatus] + + openapi_tags = [ { "name": OpenAPITags.APP_CLIENT_COMMAND, @@ -326,6 +346,25 @@ async def upload_file(response: Response, filename: str, uploaded_file: UploadFi return f"Successfully uploaded '{filename}' to the Drive" +@fastapi_service.get("/api/v1/status", response_class=JSONResponse) +async def get_status( + response: Response, + x_lightning_session_uuid: Optional[str] = Header(None), + x_lightning_session_id: Optional[str] = Header(None), +) -> Union[List, Dict]: + if x_lightning_session_uuid is None: + raise Exception("Missing X-Lightning-Session-UUID header") + if x_lightning_session_id is None: + raise Exception("Missing X-Lightning-Session-ID header") + + if not ENABLE_PULLING_STATE_ENDPOINT: + response.status_code = status.HTTP_405_METHOD_NOT_ALLOWED + return {"status": "failure", "reason": "This endpoint is disabled."} + + global app_spec + return app_spec or [] + + @fastapi_service.get("/healthz", status_code=200) async def healthz(response: Response): """Health check endpoint used in the cloud FastAPI servers to check the status periodically.""" From 0aeddd61842af3ef545515c349c218923f4101a5 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Fri, 9 Dec 2022 19:21:36 +0000 Subject: [PATCH 02/17] Updates --- src/lightning_app/core/api.py | 59 ++++++++----------- src/lightning_app/core/app.py | 7 +++ src/lightning_app/core/queues.py | 5 ++ src/lightning_app/runners/backends/backend.py | 1 + src/lightning_app/utilities/app_status.py | 24 ++++++++ 5 files changed, 61 insertions(+), 35 deletions(-) create mode 100644 src/lightning_app/utilities/app_status.py diff --git a/src/lightning_app/core/api.py b/src/lightning_app/core/api.py index d6d6c48804473..075264258e782 100644 --- a/src/lightning_app/core/api.py +++ b/src/lightning_app/core/api.py @@ -34,6 +34,7 @@ from lightning_app.core.queues import QueuingSystem from lightning_app.storage import Drive from lightning_app.utilities.app_helpers import InMemoryStateStore, Logger, StateStore +from lightning_app.utilities.app_status import AppStatus from lightning_app.utilities.cloud import is_running_in_cloud from lightning_app.utilities.component import _context from lightning_app.utilities.enum import ComponentContext, OpenAPITags @@ -66,6 +67,8 @@ class SessionMiddleware: lock = Lock() app_spec: Optional[List] = None +app_status: Optional[AppStatus] = None + # In the future, this would be abstracted to support horizontal scaling. responses_store = {} @@ -76,10 +79,17 @@ class SessionMiddleware: class UIRefresher(Thread): - def __init__(self, api_publish_state_queue, api_response_queue, refresh_interval: float = 0.1) -> None: + def __init__( + self, + api_publish_state_queue, + api_response_queue, + app_status_queue, + refresh_interval: float = 0.1, + ) -> None: super().__init__(daemon=True) self.api_publish_state_queue = api_publish_state_queue self.api_response_queue = api_response_queue + self.app_status_queue = app_status_queue self._exit_event = Event() self.refresh_interval = refresh_interval @@ -113,6 +123,12 @@ def run_once(self): except queue.Empty: pass + try: + global app_status + app_status = self.app_status_queue.get(timeout=0) + except queue.Empty: + pass + def join(self, timeout: Optional[float] = None) -> None: self._exit_event.set() super().join(timeout) @@ -122,27 +138,6 @@ class StateUpdate(BaseModel): state: dict = {} -class WorkStatus(BaseModel): - """The ``WorkStatus`` captures the status of a work according to the app.""" - - # The name of the work - name: str - - # ``True`` when the work is running according to the app. - # Compute states in the cloud are owned by the platform. - is_running: bool - - -class AppStatus(BaseModel): - """The ``AppStatus`` captures the current status of the app and its components.""" - - # ``True`` when the app UI is ready to be viewed - is_ui_ready: bool - - # The statuses of ``LightningWork`` objects currently associated with this app - work_statuses: List[WorkStatus] - - openapi_tags = [ { "name": OpenAPITags.APP_CLIENT_COMMAND, @@ -346,23 +341,16 @@ async def upload_file(response: Response, filename: str, uploaded_file: UploadFi return f"Successfully uploaded '{filename}' to the Drive" -@fastapi_service.get("/api/v1/status", response_class=JSONResponse) +@fastapi_service.get("/api/v1/status", response_class=AppStatus) async def get_status( response: Response, - x_lightning_session_uuid: Optional[str] = Header(None), - x_lightning_session_id: Optional[str] = Header(None), ) -> Union[List, Dict]: - if x_lightning_session_uuid is None: - raise Exception("Missing X-Lightning-Session-UUID header") - if x_lightning_session_id is None: - raise Exception("Missing X-Lightning-Session-ID header") + if app_status is None: + response.status_code = status.HTTP_404_NOT_FOUND + return {"status": "failure", "reason": "App status hasn't been reported yet."} - if not ENABLE_PULLING_STATE_ENDPOINT: - response.status_code = status.HTTP_405_METHOD_NOT_ALLOWED - return {"status": "failure", "reason": "This endpoint is disabled."} - - global app_spec - return app_spec or [] + global app_status + return app_status @fastapi_service.get("/healthz", status_code=200) @@ -448,6 +436,7 @@ def start_server( api_publish_state_queue, api_delta_queue, api_response_queue, + app_status_queue, has_started_queue: Optional[Queue] = None, host="127.0.0.1", port=8000, diff --git a/src/lightning_app/core/app.py b/src/lightning_app/core/app.py index d9389ecd27e24..b051f8703b113 100644 --- a/src/lightning_app/core/app.py +++ b/src/lightning_app/core/app.py @@ -35,6 +35,7 @@ _should_dispatch_app, Logger, ) +from lightning_app.utilities.app_status import AppStatus from lightning_app.utilities.commands.base import _process_requests from lightning_app.utilities.component import _convert_paths_after_init, _validate_root_flow from lightning_app.utilities.enum import AppStage, CacheCallsKeys @@ -117,6 +118,7 @@ def __init__( self.api_response_queue: Optional[BaseQueue] = None self.api_publish_state_queue: Optional[BaseQueue] = None self.api_delta_queue: Optional[BaseQueue] = None + self.app_status_queue: Optional[BaseQueue] = None self.error_queue: Optional[BaseQueue] = None self.request_queues: Optional[Dict[str, BaseQueue]] = None self.response_queues: Optional[Dict[str, BaseQueue]] = None @@ -140,6 +142,7 @@ def __init__( self.exception = None self.collect_changes: bool = True + self.status: Optional[AppStatus] = None # TODO: Enable ready locally for opening the UI. self.ready = False @@ -535,6 +538,10 @@ def _update_is_headless(self) -> None: _handle_is_headless(self) + def _update_status(self) -> None: + # old_status = self.status + pass + def _apply_restarting(self) -> bool: self._reset_original_state() # apply stage after restoring the original state. diff --git a/src/lightning_app/core/queues.py b/src/lightning_app/core/queues.py index db150a57eb098..5d04b5bc9027f 100644 --- a/src/lightning_app/core/queues.py +++ b/src/lightning_app/core/queues.py @@ -38,6 +38,7 @@ CALLER_QUEUE_CONSTANT = "CALLER_QUEUE" API_STATE_PUBLISH_QUEUE_CONSTANT = "API_STATE_PUBLISH_QUEUE" API_DELTA_QUEUE_CONSTANT = "API_DELTA_QUEUE" +APP_STATUS_QUEUE_CONSTANT = "APP_STATUS_QUEUE" API_REFRESH_QUEUE_CONSTANT = "API_REFRESH_QUEUE" ORCHESTRATOR_REQUEST_CONSTANT = "ORCHESTRATOR_REQUEST" ORCHESTRATOR_RESPONSE_CONSTANT = "ORCHESTRATOR_RESPONSE" @@ -96,6 +97,10 @@ def get_api_delta_queue(self, queue_id: Optional[str] = None) -> "BaseQueue": queue_name = f"{queue_id}_{DELTA_QUEUE_CONSTANT}" if queue_id else DELTA_QUEUE_CONSTANT return self.get_queue(queue_name) + def get_app_status_queue(self, queue_id: Optional[str] = None) -> "BaseQueue": + queue_name = f"{queue_id}_{APP_STATUS_QUEUE_CONSTANT}" if queue_id else APP_STATUS_QUEUE_CONSTANT + return self.get_queue(queue_name) + def get_orchestrator_request_queue(self, work_name: str, queue_id: Optional[str] = None) -> "BaseQueue": queue_name = ( f"{queue_id}_{ORCHESTRATOR_REQUEST_CONSTANT}_{work_name}" diff --git a/src/lightning_app/runners/backends/backend.py b/src/lightning_app/runners/backends/backend.py index 54c1f9092bf0f..b4114286f8109 100644 --- a/src/lightning_app/runners/backends/backend.py +++ b/src/lightning_app/runners/backends/backend.py @@ -88,6 +88,7 @@ def _prepare_queues(self, app: "lightning_app.LightningApp"): app.error_queue = self.queues.get_error_queue(**kw) app.api_publish_state_queue = self.queues.get_api_state_publish_queue(**kw) app.api_delta_queue = app.delta_queue + app.app_status_queue = self.queues.get_app_status_queue(**kw) app.request_queues = {} app.response_queues = {} app.copy_request_queues = {} diff --git a/src/lightning_app/utilities/app_status.py b/src/lightning_app/utilities/app_status.py new file mode 100644 index 0000000000000..fc3fd9457ffa5 --- /dev/null +++ b/src/lightning_app/utilities/app_status.py @@ -0,0 +1,24 @@ +from typing import List + +from pydantic import BaseModel + + +class WorkStatus(BaseModel): + """The ``WorkStatus`` captures the status of a work according to the app.""" + + # The name of the work + name: str + + # ``True`` when the work is running according to the app. + # Compute states in the cloud are owned by the platform. + is_running: bool + + +class AppStatus(BaseModel): + """The ``AppStatus`` captures the current status of the app and its components.""" + + # ``True`` when the app UI is ready to be viewed + is_ui_ready: bool + + # The statuses of ``LightningWork`` objects currently associated with this app + work_statuses: List[WorkStatus] From 2b683102736c1b5b3539f2f16d342a52caa4e247 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Wed, 14 Dec 2022 20:44:19 +0000 Subject: [PATCH 03/17] Updates --- src/lightning_app/core/api.py | 17 ++++++--------- src/lightning_app/core/app.py | 26 ++++++++++++++--------- src/lightning_app/core/work.py | 6 +++--- src/lightning_app/runners/multiprocess.py | 1 + src/lightning_app/utilities/app_status.py | 14 ++++++------ src/lightning_app/utilities/enum.py | 15 +------------ 6 files changed, 36 insertions(+), 43 deletions(-) diff --git a/src/lightning_app/core/api.py b/src/lightning_app/core/api.py index 075264258e782..5aaeee4511446 100644 --- a/src/lightning_app/core/api.py +++ b/src/lightning_app/core/api.py @@ -341,15 +341,12 @@ async def upload_file(response: Response, filename: str, uploaded_file: UploadFi return f"Successfully uploaded '{filename}' to the Drive" -@fastapi_service.get("/api/v1/status", response_class=AppStatus) -async def get_status( - response: Response, -) -> Union[List, Dict]: - if app_status is None: - response.status_code = status.HTTP_404_NOT_FOUND - return {"status": "failure", "reason": "App status hasn't been reported yet."} - +@fastapi_service.get("/api/v1/status", response_model=AppStatus) +async def get_status() -> AppStatus: + """Get the current status of the app and works.""" global app_status + if app_status is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="App status hasn't been reported yet.") return app_status @@ -436,7 +433,7 @@ def start_server( api_publish_state_queue, api_delta_queue, api_response_queue, - app_status_queue, + app_status_queue: Optional[Queue] = None, has_started_queue: Optional[Queue] = None, host="127.0.0.1", port=8000, @@ -458,7 +455,7 @@ def start_server( global_app_state_store.add(TEST_SESSION_UUID) - refresher = UIRefresher(api_publish_state_queue, api_response_queue) + refresher = UIRefresher(api_publish_state_queue, api_response_queue, app_status_queue) refresher.setDaemon(True) refresher.start() diff --git a/src/lightning_app/core/app.py b/src/lightning_app/core/app.py index b051f8703b113..b348f439eb446 100644 --- a/src/lightning_app/core/app.py +++ b/src/lightning_app/core/app.py @@ -421,6 +421,7 @@ def run_once(self): self._update_layout() self._update_is_headless() + self._update_status() self.maybe_apply_changes() if self.checkpointing and self._should_snapshot(): @@ -488,14 +489,7 @@ def _run(self) -> bool: self._original_state = deepcopy(self.state) done = False - # TODO: Re-enable the `ready` property once issues are resolved - if not self.root.ready: - warnings.warn( - "One of your Flows returned `.ready` as `False`. " - "This feature is not yet enabled so this will be ignored.", - UserWarning, - ) - self.ready = True + self.ready = self.root.ready self._start_with_flow_works() @@ -539,8 +533,20 @@ def _update_is_headless(self) -> None: _handle_is_headless(self) def _update_status(self) -> None: - # old_status = self.status - pass + old_status = self.status + + work_statuses = [] + for work in breadth_first(self.root, types=(lightning_app.LightningWork,)): + work_statuses.append(work.status) + + new_status = AppStatus( + is_ui_ready=self.ready, + work_statuses=work_statuses, + ) + + if new_status != old_status: + self.status = new_status + self.app_status_queue.put(self.status) def _apply_restarting(self) -> bool: self._reset_original_state() diff --git a/src/lightning_app/core/work.py b/src/lightning_app/core/work.py index 60d1ea62d8afb..6ac15a4a20610 100644 --- a/src/lightning_app/core/work.py +++ b/src/lightning_app/core/work.py @@ -12,13 +12,13 @@ from lightning_app.storage.drive import _maybe_create_drive, Drive from lightning_app.storage.payload import Payload from lightning_app.utilities.app_helpers import _is_json_serializable, _LightningAppRef, is_overridden +from lightning_app.utilities.app_status import WorkStatus from lightning_app.utilities.component import _is_flow_context, _sanitize_state from lightning_app.utilities.enum import ( CacheCallsKeys, make_status, WorkFailureReasons, WorkStageStatus, - WorkStatus, WorkStopReasons, ) from lightning_app.utilities.exceptions import LightningWorkException @@ -267,8 +267,8 @@ def status(self) -> WorkStatus: latest_status = statuses[-1] if latest_status.get("reason") == WorkFailureReasons.TIMEOUT: return self._aggregate_status_timeout(statuses) - return WorkStatus(**latest_status) - return WorkStatus(stage=WorkStageStatus.NOT_STARTED, timestamp=time.time()) + return WorkStatus(name=self.name, **latest_status) + return WorkStatus(name=self.name, stage=WorkStageStatus.NOT_STARTED, timestamp=time.time()) @property def statuses(self) -> List[WorkStatus]: diff --git a/src/lightning_app/runners/multiprocess.py b/src/lightning_app/runners/multiprocess.py index 673e8601043d7..7839ed9a96ecd 100644 --- a/src/lightning_app/runners/multiprocess.py +++ b/src/lightning_app/runners/multiprocess.py @@ -91,6 +91,7 @@ def dispatch(self, *args: Any, open_ui: bool = True, **kwargs: Any): api_response_queue=self.app.api_response_queue, api_publish_state_queue=self.app.api_publish_state_queue, api_delta_queue=self.app.api_delta_queue, + app_status_queue=self.app.app_status_queue, has_started_queue=has_started_queue, spec=extract_metadata_from_app(self.app), root_path=self.app.root_path, diff --git a/src/lightning_app/utilities/app_status.py b/src/lightning_app/utilities/app_status.py index fc3fd9457ffa5..95248433d782f 100644 --- a/src/lightning_app/utilities/app_status.py +++ b/src/lightning_app/utilities/app_status.py @@ -1,17 +1,19 @@ -from typing import List +from typing import List, Optional from pydantic import BaseModel +from lightning_app.utilities.enum import WorkStageStatus + class WorkStatus(BaseModel): """The ``WorkStatus`` captures the status of a work according to the app.""" - # The name of the work name: str - - # ``True`` when the work is running according to the app. - # Compute states in the cloud are owned by the platform. - is_running: bool + stage: WorkStageStatus + timestamp: float + reason: Optional[str] = None + message: Optional[str] = None + count: int = 1 class AppStatus(BaseModel): diff --git a/src/lightning_app/utilities/enum.py b/src/lightning_app/utilities/enum.py index 11cd7fabc4299..e0d211d29ff27 100644 --- a/src/lightning_app/utilities/enum.py +++ b/src/lightning_app/utilities/enum.py @@ -1,5 +1,4 @@ import enum -from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional @@ -37,7 +36,7 @@ class WorkPendingReason(enum.Enum): REQUESTING_RESOURCE = "requesting_ressource" -class WorkStageStatus: +class WorkStageStatus(str, enum.Enum): NOT_STARTED = "not_started" STARTED = "started" STOPPED = "stopped" @@ -47,18 +46,6 @@ class WorkStageStatus: FAILED = "failed" -@dataclass -class WorkStatus: - stage: WorkStageStatus - timestamp: float - reason: Optional[str] = None - message: Optional[str] = None - count: int = 1 - - def __post_init__(self): - assert self.timestamp > 0 and self.timestamp < (int(datetime.now().timestamp()) + 10) - - def make_status(stage: str, message: Optional[str] = None, reason: Optional[str] = None): status = { "stage": stage, From 1f29503a7832324b7c47350ffe834de7153c5edc Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Thu, 15 Dec 2022 15:55:40 +0000 Subject: [PATCH 04/17] Updates --- .../components/serve/streamlit.py | 5 ++++ src/lightning_app/core/app.py | 4 +-- tests/tests_app/core/test_lightning_api.py | 9 ++++++- tests/tests_app/core/test_lightning_flow.py | 25 ++++++++++++------- 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/lightning_app/components/serve/streamlit.py b/src/lightning_app/components/serve/streamlit.py index 9b943a1708fa3..41a88dafbfe08 100644 --- a/src/lightning_app/components/serve/streamlit.py +++ b/src/lightning_app/components/serve/streamlit.py @@ -4,6 +4,7 @@ import pydoc import subprocess import sys +import time from typing import Any, Callable, Type from lightning_app.core.work import LightningWork @@ -20,6 +21,8 @@ class ServeStreamlit(LightningWork, abc.ABC): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.ready = False + self._process = None @property @@ -38,6 +41,7 @@ def build_model(self) -> Any: return None def run(self) -> None: + time.sleep(100) env = os.environ.copy() env["LIGHTNING_COMPONENT_NAME"] = self.name env["LIGHTNING_WORK"] = self.__class__.__name__ @@ -58,6 +62,7 @@ def run(self) -> None: ], env=env, ) + self.ready = True self._process.wait() def on_exit(self) -> None: diff --git a/src/lightning_app/core/app.py b/src/lightning_app/core/app.py index 29b7d0b10de23..fc44378a1e8e1 100644 --- a/src/lightning_app/core/app.py +++ b/src/lightning_app/core/app.py @@ -493,7 +493,7 @@ def _run(self) -> bool: self._start_with_flow_works() - if self.ready and self.should_publish_changes_to_api and self.api_publish_state_queue: + if self.should_publish_changes_to_api and self.api_publish_state_queue: self.api_publish_state_queue.put(self.state_vars) self._reset_run_time_monitor() @@ -503,7 +503,7 @@ def _run(self) -> bool: self._update_run_time_monitor() - if self.ready and self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue: + if self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue: self.api_publish_state_queue.put(self.state_vars) self._has_updated = False diff --git a/tests/tests_app/core/test_lightning_api.py b/tests/tests_app/core/test_lightning_api.py index 04b89c927941a..a46eb4e9f9bbd 100644 --- a/tests/tests_app/core/test_lightning_api.py +++ b/tests/tests_app/core/test_lightning_api.py @@ -31,6 +31,7 @@ from lightning_app.runners import MultiProcessRuntime from lightning_app.storage.drive import Drive from lightning_app.testing.helpers import _MockQueue +from lightning_app.utilities.app_status import AppStatus from lightning_app.utilities.component import _set_frontend_context, _set_work_context from lightning_app.utilities.enum import AppStage from lightning_app.utilities.load_app import extract_metadata_from_app @@ -197,7 +198,7 @@ def test_update_publish_state_and_maybe_refresh_ui(): publish_state_queue.put(app.state_with_changes) - thread = UIRefresher(publish_state_queue, api_response_queue) + thread = UIRefresher(publish_state_queue, api_response_queue, None) thread.run_once() assert global_app_state_store.get_app_state("1234") == app.state_with_changes @@ -225,13 +226,16 @@ def get(self, timeout: int = 0): change_state_queue = _MockQueue("change_state_queue") has_started_queue = _MockQueue("has_started_queue") api_response_queue = _MockQueue("api_response_queue") + app_status_queue = _MockQueue("app_status_queue") state = app.state_with_changes publish_state_queue.put(state) + app_status_queue.put(AppStatus(is_ui_ready=True, work_statuses=[])) spec = extract_metadata_from_app(app) ui_refresher = start_server( publish_state_queue, change_state_queue, api_response_queue, + app_status_queue=app_status_queue, has_started_queue=has_started_queue, uvicorn_run=False, spec=spec, @@ -284,6 +288,9 @@ def get(self, timeout: int = 0): {"name": "main_4", "content": "https://te"}, ] + response = await client.get("/api/v1/status") + assert response.json() == {"is_ui_ready": True, "work_statuses": []} + response = await client.post("/api/v1/state", json={"state": new_state}, headers=headers) assert change_state_queue._queue[1].to_dict() == { "values_changed": {"root['vars']['counter']": {"new_value": 1}} diff --git a/tests/tests_app/core/test_lightning_flow.py b/tests/tests_app/core/test_lightning_flow.py index c8e9921f29eec..afbad586c6260 100644 --- a/tests/tests_app/core/test_lightning_flow.py +++ b/tests/tests_app/core/test_lightning_flow.py @@ -5,7 +5,7 @@ from dataclasses import dataclass from functools import partial from time import time -from unittest.mock import ANY, MagicMock +from unittest.mock import ANY import pytest from deepdiff import DeepDiff, Delta @@ -19,7 +19,7 @@ from lightning_app.storage.path import _storage_root_dir from lightning_app.structures import Dict as LDict from lightning_app.structures import List as LList -from lightning_app.testing.helpers import EmptyFlow, EmptyWork +from lightning_app.testing.helpers import _MockQueue, EmptyFlow, EmptyWork from lightning_app.utilities.app_helpers import ( _delta_to_app_state_delta, _LightningAppRef, @@ -887,21 +887,28 @@ def run(self): def test_flow_ready(): - """This test validates the api publish state queue is populated only once ready is True.""" + """This test validates that the app status queue is populated correctly.""" + + mock_queue = _MockQueue("app_status_queue") def run_patch(method): - app.api_publish_state_queue = MagicMock() - app.should_publish_changes_to_api = False + app.app_status_queue = mock_queue method() app = LightningApp(FlowReady()) app._run = partial(run_patch, method=app._run) MultiProcessRuntime(app, start_server=False).dispatch() - # Validates the state has been added only when ready was true. - state = app.api_publish_state_queue.put._mock_call_args[0][0] - call_hash = state["works"]["w"]["calls"]["latest_call_hash"] - assert state["works"]["w"]["calls"][call_hash]["statuses"][0]["stage"] == "succeeded" + # Run the app once to ensure it updates the queues + app.run_once() + + first_status = mock_queue.get() + assert not first_status.is_ui_ready + + last_status = mock_queue.get() + while len(mock_queue) > 0: + last_status = mock_queue.get() + assert last_status.is_ui_ready def test_structures_register_work_cloudcompute(): From 00f86fa1f4d9a44958234620d2fab7587d857530 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Thu, 15 Dec 2022 16:00:58 +0000 Subject: [PATCH 05/17] Remove debug --- src/lightning_app/components/serve/streamlit.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/lightning_app/components/serve/streamlit.py b/src/lightning_app/components/serve/streamlit.py index 41a88dafbfe08..720139f93f25e 100644 --- a/src/lightning_app/components/serve/streamlit.py +++ b/src/lightning_app/components/serve/streamlit.py @@ -4,7 +4,6 @@ import pydoc import subprocess import sys -import time from typing import Any, Callable, Type from lightning_app.core.work import LightningWork @@ -41,7 +40,6 @@ def build_model(self) -> Any: return None def run(self) -> None: - time.sleep(100) env = os.environ.copy() env["LIGHTNING_COMPONENT_NAME"] = self.name env["LIGHTNING_WORK"] = self.__class__.__name__ From ed6801d71e186223fd17a573e7320880d2367752 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Thu, 15 Dec 2022 16:03:09 +0000 Subject: [PATCH 06/17] Update comment --- src/lightning_app/core/flow.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/lightning_app/core/flow.py b/src/lightning_app/core/flow.py index 302ba344320d1..25fae44cd29d0 100644 --- a/src/lightning_app/core/flow.py +++ b/src/lightning_app/core/flow.py @@ -255,10 +255,7 @@ def __getattr__(self, item): @property def ready(self) -> bool: - """Not currently enabled. - - Override to customize when your App should be ready. - """ + """Override to customize when your App should be ready.""" flows = self.flows return all(flow.ready for flow in flows.values()) if flows else True From 41ee98708a874b870bec29e1f9054dd09428526d Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Thu, 15 Dec 2022 17:52:36 +0000 Subject: [PATCH 07/17] Update examples/app_boring/app.py Co-authored-by: thomas chaton --- examples/app_boring/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/app_boring/app.py b/examples/app_boring/app.py index dafa87b5406d2..78a9b1c819f06 100644 --- a/examples/app_boring/app.py +++ b/examples/app_boring/app.py @@ -45,7 +45,7 @@ def __init__(self): @property def ready(self) -> bool: - return self.source_work.has_succeeded and self.dest_work.is_running + return self.dest_work.is_running def run(self): self.source_work.run() From 30f12114d4df5a24014d25ad2624e92b91c41c4f Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Fri, 16 Dec 2022 14:01:20 +0000 Subject: [PATCH 08/17] Remove app status queue --- src/lightning_app/core/api.py | 15 +++-------- src/lightning_app/core/app.py | 13 +++++----- src/lightning_app/core/queues.py | 5 ---- src/lightning_app/runners/backends/backend.py | 1 - src/lightning_app/runners/multiprocess.py | 1 - src/lightning_app/runners/runtime.py | 2 +- src/lightning_app/testing/helpers.py | 3 +++ tests/tests_app/core/test_lightning_api.py | 7 ++---- tests/tests_app/core/test_lightning_flow.py | 25 +++++++++++++------ 9 files changed, 32 insertions(+), 40 deletions(-) diff --git a/src/lightning_app/core/api.py b/src/lightning_app/core/api.py index 8a2b912a6b2b6..73d7274631a69 100644 --- a/src/lightning_app/core/api.py +++ b/src/lightning_app/core/api.py @@ -83,13 +83,11 @@ def __init__( self, api_publish_state_queue, api_response_queue, - app_status_queue: Optional[Queue], refresh_interval: float = 0.1, ) -> None: super().__init__(daemon=True) self.api_publish_state_queue = api_publish_state_queue self.api_response_queue = api_response_queue - self.app_status_queue = app_status_queue self._exit_event = Event() self.refresh_interval = refresh_interval @@ -107,7 +105,8 @@ def run(self): def run_once(self): try: - state = self.api_publish_state_queue.get(timeout=0) + global app_status + state, app_status = self.api_publish_state_queue.get(timeout=0) with lock: global_app_state_store.set_app_state(TEST_SESSION_UUID, state) except queue.Empty: @@ -123,13 +122,6 @@ def run_once(self): except queue.Empty: pass - if self.app_status_queue is not None: - try: - global app_status - app_status = self.app_status_queue.get(timeout=0) - except queue.Empty: - pass - def join(self, timeout: Optional[float] = None) -> None: self._exit_event.set() super().join(timeout) @@ -434,7 +426,6 @@ def start_server( api_publish_state_queue, api_delta_queue, api_response_queue, - app_status_queue: Optional[Queue] = None, has_started_queue: Optional[Queue] = None, host="127.0.0.1", port=8000, @@ -456,7 +447,7 @@ def start_server( global_app_state_store.add(TEST_SESSION_UUID) - refresher = UIRefresher(api_publish_state_queue, api_response_queue, app_status_queue) + refresher = UIRefresher(api_publish_state_queue, api_response_queue) refresher.setDaemon(True) refresher.start() diff --git a/src/lightning_app/core/app.py b/src/lightning_app/core/app.py index fc44378a1e8e1..2826edb47533f 100644 --- a/src/lightning_app/core/app.py +++ b/src/lightning_app/core/app.py @@ -118,7 +118,6 @@ def __init__( self.api_response_queue: Optional[BaseQueue] = None self.api_publish_state_queue: Optional[BaseQueue] = None self.api_delta_queue: Optional[BaseQueue] = None - self.app_status_queue: Optional[BaseQueue] = None self.error_queue: Optional[BaseQueue] = None self.request_queues: Optional[Dict[str, BaseQueue]] = None self.response_queues: Optional[Dict[str, BaseQueue]] = None @@ -153,6 +152,7 @@ def __init__( self.checkpointing: bool = False self._update_layout() + self._update_status() self.is_headless: Optional[bool] = None @@ -494,7 +494,7 @@ def _run(self) -> bool: self._start_with_flow_works() if self.should_publish_changes_to_api and self.api_publish_state_queue: - self.api_publish_state_queue.put(self.state_vars) + self.api_publish_state_queue.put((self.state_vars, self.status)) self._reset_run_time_monitor() @@ -504,7 +504,7 @@ def _run(self) -> bool: self._update_run_time_monitor() if self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue: - self.api_publish_state_queue.put(self.state_vars) + self.api_publish_state_queue.put((self.state_vars, self.status)) self._has_updated = False @@ -536,14 +536,13 @@ def _update_status(self) -> None: for work in breadth_first(self.root, types=(lightning_app.LightningWork,)): work_statuses.append(work.status) - new_status = AppStatus( + self.status = AppStatus( is_ui_ready=self.ready, work_statuses=work_statuses, ) - if new_status != old_status: - self.status = new_status - self.app_status_queue.put(self.status) + if self.status != old_status: + self._has_updated = True def _apply_restarting(self) -> bool: self._reset_original_state() diff --git a/src/lightning_app/core/queues.py b/src/lightning_app/core/queues.py index 5d04b5bc9027f..db150a57eb098 100644 --- a/src/lightning_app/core/queues.py +++ b/src/lightning_app/core/queues.py @@ -38,7 +38,6 @@ CALLER_QUEUE_CONSTANT = "CALLER_QUEUE" API_STATE_PUBLISH_QUEUE_CONSTANT = "API_STATE_PUBLISH_QUEUE" API_DELTA_QUEUE_CONSTANT = "API_DELTA_QUEUE" -APP_STATUS_QUEUE_CONSTANT = "APP_STATUS_QUEUE" API_REFRESH_QUEUE_CONSTANT = "API_REFRESH_QUEUE" ORCHESTRATOR_REQUEST_CONSTANT = "ORCHESTRATOR_REQUEST" ORCHESTRATOR_RESPONSE_CONSTANT = "ORCHESTRATOR_RESPONSE" @@ -97,10 +96,6 @@ def get_api_delta_queue(self, queue_id: Optional[str] = None) -> "BaseQueue": queue_name = f"{queue_id}_{DELTA_QUEUE_CONSTANT}" if queue_id else DELTA_QUEUE_CONSTANT return self.get_queue(queue_name) - def get_app_status_queue(self, queue_id: Optional[str] = None) -> "BaseQueue": - queue_name = f"{queue_id}_{APP_STATUS_QUEUE_CONSTANT}" if queue_id else APP_STATUS_QUEUE_CONSTANT - return self.get_queue(queue_name) - def get_orchestrator_request_queue(self, work_name: str, queue_id: Optional[str] = None) -> "BaseQueue": queue_name = ( f"{queue_id}_{ORCHESTRATOR_REQUEST_CONSTANT}_{work_name}" diff --git a/src/lightning_app/runners/backends/backend.py b/src/lightning_app/runners/backends/backend.py index b4114286f8109..54c1f9092bf0f 100644 --- a/src/lightning_app/runners/backends/backend.py +++ b/src/lightning_app/runners/backends/backend.py @@ -88,7 +88,6 @@ def _prepare_queues(self, app: "lightning_app.LightningApp"): app.error_queue = self.queues.get_error_queue(**kw) app.api_publish_state_queue = self.queues.get_api_state_publish_queue(**kw) app.api_delta_queue = app.delta_queue - app.app_status_queue = self.queues.get_app_status_queue(**kw) app.request_queues = {} app.response_queues = {} app.copy_request_queues = {} diff --git a/src/lightning_app/runners/multiprocess.py b/src/lightning_app/runners/multiprocess.py index e951a8092c4fc..e5d34fb76800f 100644 --- a/src/lightning_app/runners/multiprocess.py +++ b/src/lightning_app/runners/multiprocess.py @@ -91,7 +91,6 @@ def dispatch(self, *args: Any, open_ui: bool = True, **kwargs: Any): api_response_queue=self.app.api_response_queue, api_publish_state_queue=self.app.api_publish_state_queue, api_delta_queue=self.app.api_delta_queue, - app_status_queue=self.app.app_status_queue, has_started_queue=has_started_queue, spec=extract_metadata_from_app(self.app), root_path=self.app.root_path, diff --git a/src/lightning_app/runners/runtime.py b/src/lightning_app/runners/runtime.py index ab552f136fde0..d9bf372428894 100644 --- a/src/lightning_app/runners/runtime.py +++ b/src/lightning_app/runners/runtime.py @@ -122,7 +122,7 @@ def terminate(self) -> None: self._add_stopped_status_to_work(work) # Publish the updated state and wait for the frontend to update. - self.app.api_publish_state_queue.put(self.app.state) + self.app.api_publish_state_queue.put((self.app.state, self.app.status)) for thread in self.threads + self.app.threads: thread.join(timeout=0) diff --git a/src/lightning_app/testing/helpers.py b/src/lightning_app/testing/helpers.py index cb21d314145ff..18aae8140d930 100644 --- a/src/lightning_app/testing/helpers.py +++ b/src/lightning_app/testing/helpers.py @@ -136,6 +136,9 @@ def __len__(self): def __repr__(self) -> str: return f"{self.__class__.__name__}({self._queue})" + def __bool__(self) -> bool: + return True + class EmptyFlow(LightningFlow): """A LightningFlow that implements all abstract methods to do nothing. diff --git a/tests/tests_app/core/test_lightning_api.py b/tests/tests_app/core/test_lightning_api.py index a46eb4e9f9bbd..edf5fd7afc852 100644 --- a/tests/tests_app/core/test_lightning_api.py +++ b/tests/tests_app/core/test_lightning_api.py @@ -196,7 +196,7 @@ def test_update_publish_state_and_maybe_refresh_ui(): publish_state_queue = _MockQueue("publish_state_queue") api_response_queue = _MockQueue("api_response_queue") - publish_state_queue.put(app.state_with_changes) + publish_state_queue.put((app.state_with_changes, None)) thread = UIRefresher(publish_state_queue, api_response_queue, None) thread.run_once() @@ -226,16 +226,13 @@ def get(self, timeout: int = 0): change_state_queue = _MockQueue("change_state_queue") has_started_queue = _MockQueue("has_started_queue") api_response_queue = _MockQueue("api_response_queue") - app_status_queue = _MockQueue("app_status_queue") state = app.state_with_changes - publish_state_queue.put(state) - app_status_queue.put(AppStatus(is_ui_ready=True, work_statuses=[])) + publish_state_queue.put((state, AppStatus(is_ui_ready=True, work_statuses=[]))) spec = extract_metadata_from_app(app) ui_refresher = start_server( publish_state_queue, change_state_queue, api_response_queue, - app_status_queue=app_status_queue, has_started_queue=has_started_queue, uvicorn_run=False, spec=spec, diff --git a/tests/tests_app/core/test_lightning_flow.py b/tests/tests_app/core/test_lightning_flow.py index afbad586c6260..57dfb076a7149 100644 --- a/tests/tests_app/core/test_lightning_flow.py +++ b/tests/tests_app/core/test_lightning_flow.py @@ -889,25 +889,34 @@ def run(self): def test_flow_ready(): """This test validates that the app status queue is populated correctly.""" - mock_queue = _MockQueue("app_status_queue") + mock_queue = _MockQueue("api_publish_state_queue") def run_patch(method): - app.app_status_queue = mock_queue + app.should_publish_changes_to_api = True + app.api_publish_state_queue = mock_queue method() + state = {"done": False} + + def lagged_run_once(method): + """Ensure that the full loop is run after the app exits.""" + new_done = method() + if state["done"]: + return True + state["done"] = new_done + return False + app = LightningApp(FlowReady()) app._run = partial(run_patch, method=app._run) + app.run_once = partial(lagged_run_once, method=app.run_once) MultiProcessRuntime(app, start_server=False).dispatch() - # Run the app once to ensure it updates the queues - app.run_once() - - first_status = mock_queue.get() + _, first_status = mock_queue.get() assert not first_status.is_ui_ready - last_status = mock_queue.get() + _, last_status = mock_queue.get() while len(mock_queue) > 0: - last_status = mock_queue.get() + _, last_status = mock_queue.get() assert last_status.is_ui_ready From 312c571c644a6f50238a744499ec1bd6c0f0007c Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Fri, 16 Dec 2022 15:05:01 +0000 Subject: [PATCH 09/17] Fixes --- src/lightning_app/utilities/app_status.py | 4 +--- src/lightning_app/utilities/enum.py | 2 +- tests/tests_app/storage/test_orchestrator.py | 12 ++++++------ tests/tests_app/storage/test_path.py | 2 +- tests/tests_app/utilities/test_proxies.py | 13 +++++++------ 5 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/lightning_app/utilities/app_status.py b/src/lightning_app/utilities/app_status.py index 95248433d782f..5184fb1ea291c 100644 --- a/src/lightning_app/utilities/app_status.py +++ b/src/lightning_app/utilities/app_status.py @@ -2,14 +2,12 @@ from pydantic import BaseModel -from lightning_app.utilities.enum import WorkStageStatus - class WorkStatus(BaseModel): """The ``WorkStatus`` captures the status of a work according to the app.""" name: str - stage: WorkStageStatus + stage: str timestamp: float reason: Optional[str] = None message: Optional[str] = None diff --git a/src/lightning_app/utilities/enum.py b/src/lightning_app/utilities/enum.py index e0d211d29ff27..4c92ffba3db11 100644 --- a/src/lightning_app/utilities/enum.py +++ b/src/lightning_app/utilities/enum.py @@ -36,7 +36,7 @@ class WorkPendingReason(enum.Enum): REQUESTING_RESOURCE = "requesting_ressource" -class WorkStageStatus(str, enum.Enum): +class WorkStageStatus: NOT_STARTED = "not_started" STARTED = "started" STOPPED = "stopped" diff --git a/tests/tests_app/storage/test_orchestrator.py b/tests/tests_app/storage/test_orchestrator.py index ca671e6b93704..4b391a890f1a9 100644 --- a/tests/tests_app/storage/test_orchestrator.py +++ b/tests/tests_app/storage/test_orchestrator.py @@ -39,7 +39,7 @@ def test_orchestrator(): # orchestrator is now waiting for a response for copier in Work A assert "work_b" in orchestrator.waiting_for_response - assert not request_queues["work_a"] + assert len(request_queues["work_a"]) == 0 assert request in copy_request_queues["work_a"] assert request.destination == "work_b" @@ -54,7 +54,7 @@ def test_orchestrator(): # orchestrator processes confirmation and confirms to the pending request from Work B orchestrator.run_once("work_a") - assert not copy_response_queues["work_a"] + assert len(copy_response_queues["work_a"]) == 0 assert response in response_queues["work_b"] assert not orchestrator.waiting_for_response orchestrator.run_once("work_b") @@ -71,7 +71,7 @@ def test_orchestrator(): assert response.exception is None # all queues should be empty - assert all(not queue for queue in request_queues.values()) - assert all(not queue for queue in response_queues.values()) - assert all(not queue for queue in copy_request_queues.values()) - assert all(not queue for queue in copy_response_queues.values()) + assert all(len(queue) == 0 for queue in request_queues.values()) + assert all(len(queue) == 0 for queue in response_queues.values()) + assert all(len(queue) == 0 for queue in copy_request_queues.values()) + assert all(len(queue) == 0 for queue in copy_response_queues.values()) diff --git a/tests/tests_app/storage/test_path.py b/tests/tests_app/storage/test_path.py index 3cd501f7344c8..2310b8034c303 100644 --- a/tests/tests_app/storage/test_path.py +++ b/tests/tests_app/storage/test_path.py @@ -606,7 +606,7 @@ def test_path_response_not_matching_reqeuest(tmpdir): path.get() # simulate a response that has a different hash than the request had - assert not response_queue + assert len(response_queue) == 0 response.path = str(path) response.hash = "other_hash" response_queue.put(response) diff --git a/tests/tests_app/utilities/test_proxies.py b/tests/tests_app/utilities/test_proxies.py index 4b8a5f25f71e3..a53d8e85a3d37 100644 --- a/tests/tests_app/utilities/test_proxies.py +++ b/tests/tests_app/utilities/test_proxies.py @@ -250,6 +250,7 @@ def __call__(self): state = deepcopy(self.work.state) self.work._calls[call_hash]["statuses"].append( { + "name": self.work.name, "stage": WorkStageStatus.FAILED, "reason": WorkFailureReasons.TIMEOUT, "timestamp": time.time(), @@ -547,7 +548,7 @@ def run(self, use_setattr=False, use_containers=False): # 1. Simulate no state changes ############################## work.run(use_setattr=False, use_containers=False) - assert not delta_queue + assert len(delta_queue) == 0 ############################ # 2. Simulate a setattr call @@ -563,16 +564,16 @@ def run(self, use_setattr=False, use_containers=False): assert len(observer._delta_memory) == 1 # The observer should not trigger any deltas being sent and only consume the delta memory - assert not delta_queue + assert len(delta_queue) == 0 observer.run_once() - assert not delta_queue + assert len(delta_queue) == 0 assert not observer._delta_memory ################################ # 3. Simulate a container update ################################ work.run(use_setattr=False, use_containers=True) - assert not delta_queue + assert len(delta_queue) == 0 assert not observer._delta_memory observer.run_once() observer.run_once() # multiple runs should not affect how many deltas are sent unless there are changes @@ -591,7 +592,7 @@ def run(self, use_setattr=False, use_containers=False): delta = delta_queue.get().delta.to_dict() assert delta == {"values_changed": {"root['vars']['var']": {"new_value": 3}}} - assert not delta_queue + assert len(delta_queue) == 0 assert len(observer._delta_memory) == 1 observer.run_once() @@ -599,7 +600,7 @@ def run(self, use_setattr=False, use_containers=False): assert delta["values_changed"] == {"root['vars']['dict']['counter']": {"new_value": 2}} assert delta["iterable_item_added"] == {"root['vars']['list'][1]": 1} - assert not delta_queue + assert len(delta_queue) == 0 assert not observer._delta_memory From cb6baffbfeb39fcdb447d7e387d9077a5ade5d63 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Fri, 16 Dec 2022 15:07:25 +0000 Subject: [PATCH 10/17] Update --- src/lightning_app/core/app.py | 4 ++-- src/lightning_app/testing/helpers.py | 3 --- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/lightning_app/core/app.py b/src/lightning_app/core/app.py index 2826edb47533f..f23ab3ccc223c 100644 --- a/src/lightning_app/core/app.py +++ b/src/lightning_app/core/app.py @@ -493,7 +493,7 @@ def _run(self) -> bool: self._start_with_flow_works() - if self.should_publish_changes_to_api and self.api_publish_state_queue: + if self.should_publish_changes_to_api and self.api_publish_state_queue is not None: self.api_publish_state_queue.put((self.state_vars, self.status)) self._reset_run_time_monitor() @@ -503,7 +503,7 @@ def _run(self) -> bool: self._update_run_time_monitor() - if self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue: + if self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue is not None: self.api_publish_state_queue.put((self.state_vars, self.status)) self._has_updated = False diff --git a/src/lightning_app/testing/helpers.py b/src/lightning_app/testing/helpers.py index 18aae8140d930..cb21d314145ff 100644 --- a/src/lightning_app/testing/helpers.py +++ b/src/lightning_app/testing/helpers.py @@ -136,9 +136,6 @@ def __len__(self): def __repr__(self) -> str: return f"{self.__class__.__name__}({self._queue})" - def __bool__(self) -> bool: - return True - class EmptyFlow(LightningFlow): """A LightningFlow that implements all abstract methods to do nothing. From 5c0ec054bf3dbd76bf076b5e1f6dfecfe75f7b40 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Fri, 16 Dec 2022 15:13:33 +0000 Subject: [PATCH 11/17] Switch to 503 --- src/lightning_app/core/api.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/lightning_app/core/api.py b/src/lightning_app/core/api.py index 73d7274631a69..4a439fa87bd82 100644 --- a/src/lightning_app/core/api.py +++ b/src/lightning_app/core/api.py @@ -339,7 +339,9 @@ async def get_status() -> AppStatus: """Get the current status of the app and works.""" global app_status if app_status is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="App status hasn't been reported yet.") + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="App status hasn't been reported yet." + ) return app_status From 1f83edf4a46362c0133453012a0401c148c8f09b Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Fri, 16 Dec 2022 19:31:51 +0000 Subject: [PATCH 12/17] Add comment --- src/lightning_app/core/app.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lightning_app/core/app.py b/src/lightning_app/core/app.py index f23ab3ccc223c..453206bed8293 100644 --- a/src/lightning_app/core/app.py +++ b/src/lightning_app/core/app.py @@ -541,6 +541,8 @@ def _update_status(self) -> None: work_statuses=work_statuses, ) + # If the work statuses changed, the state delta will trigger an update. + # If ready has changed, we trigger an update manually. if self.status != old_status: self._has_updated = True From 7f6df16d3b319215f8e65c2420c79f1b87c3803b Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Mon, 19 Dec 2022 11:03:09 +0000 Subject: [PATCH 13/17] Cleanup --- tests/tests_app/core/test_lightning_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_app/core/test_lightning_api.py b/tests/tests_app/core/test_lightning_api.py index edf5fd7afc852..1117eeff4df02 100644 --- a/tests/tests_app/core/test_lightning_api.py +++ b/tests/tests_app/core/test_lightning_api.py @@ -198,7 +198,7 @@ def test_update_publish_state_and_maybe_refresh_ui(): publish_state_queue.put((app.state_with_changes, None)) - thread = UIRefresher(publish_state_queue, api_response_queue, None) + thread = UIRefresher(publish_state_queue, api_response_queue) thread.run_once() assert global_app_state_store.get_app_state("1234") == app.state_with_changes From 593f86799ae534ef38917312298fa5ccf2b37ffe Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Mon, 19 Dec 2022 11:20:29 +0000 Subject: [PATCH 14/17] Updates --- src/lightning_app/core/app.py | 4 ++-- src/lightning_app/core/work.py | 4 ++-- src/lightning_app/utilities/app_status.py | 5 ++--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/lightning_app/core/app.py b/src/lightning_app/core/app.py index 453206bed8293..dfce2097ec5a0 100644 --- a/src/lightning_app/core/app.py +++ b/src/lightning_app/core/app.py @@ -532,9 +532,9 @@ def _update_is_headless(self) -> None: def _update_status(self) -> None: old_status = self.status - work_statuses = [] + work_statuses = {} for work in breadth_first(self.root, types=(lightning_app.LightningWork,)): - work_statuses.append(work.status) + work_statuses[work.name] = work.status self.status = AppStatus( is_ui_ready=self.ready, diff --git a/src/lightning_app/core/work.py b/src/lightning_app/core/work.py index 84838f76cedb0..d99df13d4f933 100644 --- a/src/lightning_app/core/work.py +++ b/src/lightning_app/core/work.py @@ -309,8 +309,8 @@ def status(self) -> WorkStatus: latest_status = statuses[-1] if latest_status.get("reason") == WorkFailureReasons.TIMEOUT: return self._aggregate_status_timeout(statuses) - return WorkStatus(name=self.name, **latest_status) - return WorkStatus(name=self.name, stage=WorkStageStatus.NOT_STARTED, timestamp=time.time()) + return WorkStatus(**latest_status) + return WorkStatus(stage=WorkStageStatus.NOT_STARTED, timestamp=time.time()) @property def statuses(self) -> List[WorkStatus]: diff --git a/src/lightning_app/utilities/app_status.py b/src/lightning_app/utilities/app_status.py index 5184fb1ea291c..92a474a7918eb 100644 --- a/src/lightning_app/utilities/app_status.py +++ b/src/lightning_app/utilities/app_status.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import Dict, Optional from pydantic import BaseModel @@ -6,7 +6,6 @@ class WorkStatus(BaseModel): """The ``WorkStatus`` captures the status of a work according to the app.""" - name: str stage: str timestamp: float reason: Optional[str] = None @@ -21,4 +20,4 @@ class AppStatus(BaseModel): is_ui_ready: bool # The statuses of ``LightningWork`` objects currently associated with this app - work_statuses: List[WorkStatus] + work_statuses: Dict[str, WorkStatus] From c3c54535e58ceb0490418cbb48ddff3c42fc84bc Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Mon, 19 Dec 2022 11:32:06 +0000 Subject: [PATCH 15/17] Fix --- tests/tests_app/core/test_lightning_api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/tests_app/core/test_lightning_api.py b/tests/tests_app/core/test_lightning_api.py index 56272b4218726..adad9fba932e0 100644 --- a/tests/tests_app/core/test_lightning_api.py +++ b/tests/tests_app/core/test_lightning_api.py @@ -227,7 +227,7 @@ def get(self, timeout: int = 0): has_started_queue = _MockQueue("has_started_queue") api_response_queue = _MockQueue("api_response_queue") state = app.state_with_changes - publish_state_queue.put((state, AppStatus(is_ui_ready=True, work_statuses=[]))) + publish_state_queue.put((state, AppStatus(is_ui_ready=True, work_statuses={}))) spec = extract_metadata_from_app(app) ui_refresher = start_server( publish_state_queue, @@ -286,7 +286,7 @@ def get(self, timeout: int = 0): ] response = await client.get("/api/v1/status") - assert response.json() == {"is_ui_ready": True, "work_statuses": []} + assert response.json() == {"is_ui_ready": True, "work_statuses": {}} response = await client.post("/api/v1/state", json={"state": new_state}, headers=headers) assert change_state_queue._queue[1].to_dict() == { From 4460b0be2c34776e8e971f38e60fee3776ebf573 Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Mon, 19 Dec 2022 12:24:24 +0000 Subject: [PATCH 16/17] Add back assertion --- src/lightning_app/utilities/app_status.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/lightning_app/utilities/app_status.py b/src/lightning_app/utilities/app_status.py index 92a474a7918eb..67a8a31f0eac8 100644 --- a/src/lightning_app/utilities/app_status.py +++ b/src/lightning_app/utilities/app_status.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Dict, Optional from pydantic import BaseModel @@ -12,6 +13,11 @@ class WorkStatus(BaseModel): message: Optional[str] = None count: int = 1 + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + assert self.timestamp > 0 and self.timestamp < (int(datetime.now().timestamp()) + 10) + class AppStatus(BaseModel): """The ``AppStatus`` captures the current status of the app and its components.""" From 0d693eff5d1e0ecec1f1c99b430fa373c7f4c01d Mon Sep 17 00:00:00 2001 From: Ethan Harris Date: Mon, 19 Dec 2022 12:42:55 +0000 Subject: [PATCH 17/17] Typing --- src/lightning_app/utilities/app_status.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lightning_app/utilities/app_status.py b/src/lightning_app/utilities/app_status.py index 67a8a31f0eac8..232c3f0b65210 100644 --- a/src/lightning_app/utilities/app_status.py +++ b/src/lightning_app/utilities/app_status.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Dict, Optional +from typing import Any, Dict, Optional from pydantic import BaseModel @@ -13,7 +13,7 @@ class WorkStatus(BaseModel): message: Optional[str] = None count: int = 1 - def __init__(self, *args, **kwargs) -> None: + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) assert self.timestamp > 0 and self.timestamp < (int(datetime.now().timestamp()) + 10)