Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

[App] Add status endpoint, enable ready #16075

Merged
merged 20 commits into from Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/app_boring/app.py
Expand Up @@ -43,6 +43,10 @@ def __init__(self):
raise_exception=True,
)

@property
def ready(self) -> bool:
ethanwharris marked this conversation as resolved.
Show resolved Hide resolved
return self.dest_work.is_running

def run(self):
self.source_work.run()
if self.source_work.has_succeeded:
Expand Down
3 changes: 3 additions & 0 deletions src/lightning_app/components/serve/streamlit.py
Expand Up @@ -20,6 +20,8 @@ class ServeStreamlit(LightningWork, abc.ABC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self.ready = False

self._process = None

@property
Expand Down Expand Up @@ -58,6 +60,7 @@ def run(self) -> None:
],
env=env,
)
self.ready = True
self._process.wait()

def on_exit(self) -> None:
Expand Down
25 changes: 22 additions & 3 deletions src/lightning_app/core/api.py
Expand Up @@ -34,6 +34,7 @@
from lightning_app.core.queues import QueuingSystem
from lightning_app.storage import Drive
from lightning_app.utilities.app_helpers import InMemoryStateStore, Logger, StateStore
from lightning_app.utilities.app_status import AppStatus
from lightning_app.utilities.cloud import is_running_in_cloud
from lightning_app.utilities.component import _context
from lightning_app.utilities.enum import ComponentContext, OpenAPITags
Expand Down Expand Up @@ -66,18 +67,24 @@ class SessionMiddleware:
lock = Lock()

app_spec: Optional[List] = None
app_status: Optional[AppStatus] = None

# In the future, this would be abstracted to support horizontal scaling.
responses_store = {}

logger = Logger(__name__)


# This can be replaced with a consumer that publishes states in a kv-store
# in a serverless architecture


class UIRefresher(Thread):
def __init__(self, api_publish_state_queue, api_response_queue, refresh_interval: float = 0.1) -> None:
def __init__(
self,
api_publish_state_queue,
api_response_queue,
refresh_interval: float = 0.1,
) -> None:
super().__init__(daemon=True)
self.api_publish_state_queue = api_publish_state_queue
self.api_response_queue = api_response_queue
Expand All @@ -98,7 +105,8 @@ def run(self):

def run_once(self):
try:
state = self.api_publish_state_queue.get(timeout=0)
global app_status
state, app_status = self.api_publish_state_queue.get(timeout=0)
with lock:
global_app_state_store.set_app_state(TEST_SESSION_UUID, state)
except queue.Empty:
Expand Down Expand Up @@ -326,6 +334,17 @@ async def upload_file(response: Response, filename: str, uploaded_file: UploadFi
return f"Successfully uploaded '{filename}' to the Drive"


@fastapi_service.get("/api/v1/status", response_model=AppStatus)
async def get_status() -> AppStatus:
"""Get the current status of the app and works."""
global app_status
if app_status is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="App status hasn't been reported yet."
)
return app_status


@fastapi_service.get("/healthz", status_code=200)
async def healthz(response: Response):
"""Health check endpoint used in the cloud FastAPI servers to check the status periodically."""
Expand Down
38 changes: 26 additions & 12 deletions src/lightning_app/core/app.py
Expand Up @@ -35,6 +35,7 @@
_should_dispatch_app,
Logger,
)
from lightning_app.utilities.app_status import AppStatus
from lightning_app.utilities.commands.base import _process_requests
from lightning_app.utilities.component import _convert_paths_after_init, _validate_root_flow
from lightning_app.utilities.enum import AppStage, CacheCallsKeys
Expand Down Expand Up @@ -140,6 +141,7 @@ def __init__(
self.exception = None
self.collect_changes: bool = True

self.status: Optional[AppStatus] = None
# TODO: Enable ready locally for opening the UI.
self.ready = False

Expand All @@ -150,6 +152,7 @@ def __init__(
self.checkpointing: bool = False

self._update_layout()
self._update_status()

self.is_headless: Optional[bool] = None

Expand Down Expand Up @@ -418,6 +421,7 @@ def run_once(self):

self._update_layout()
self._update_is_headless()
self._update_status()
self.maybe_apply_changes()

if self.checkpointing and self._should_snapshot():
Expand Down Expand Up @@ -485,19 +489,12 @@ def _run(self) -> bool:
self._original_state = deepcopy(self.state)
done = False

# TODO: Re-enable the `ready` property once issues are resolved
if not self.root.ready:
warnings.warn(
"One of your Flows returned `.ready` as `False`. "
"This feature is not yet enabled so this will be ignored.",
UserWarning,
)
self.ready = True
self.ready = self.root.ready

self._start_with_flow_works()

if self.ready and self.should_publish_changes_to_api and self.api_publish_state_queue:
self.api_publish_state_queue.put(self.state_vars)
if self.should_publish_changes_to_api and self.api_publish_state_queue is not None:
self.api_publish_state_queue.put((self.state_vars, self.status))

self._reset_run_time_monitor()

Expand All @@ -506,8 +503,8 @@ def _run(self) -> bool:

self._update_run_time_monitor()

if self.ready and self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue:
self.api_publish_state_queue.put(self.state_vars)
if self._has_updated and self.should_publish_changes_to_api and self.api_publish_state_queue is not None:
self.api_publish_state_queue.put((self.state_vars, self.status))

self._has_updated = False

Expand All @@ -532,6 +529,23 @@ def _update_is_headless(self) -> None:
# This ensures support for apps which dynamically add a UI at runtime.
_handle_is_headless(self)

def _update_status(self) -> None:
old_status = self.status

work_statuses = {}
for work in breadth_first(self.root, types=(lightning_app.LightningWork,)):
work_statuses[work.name] = work.status

self.status = AppStatus(
is_ui_ready=self.ready,
work_statuses=work_statuses,
)

# If the work statuses changed, the state delta will trigger an update.
# If ready has changed, we trigger an update manually.
if self.status != old_status:
tchaton marked this conversation as resolved.
Show resolved Hide resolved
self._has_updated = True

def _apply_restarting(self) -> bool:
self._reset_original_state()
# apply stage after restoring the original state.
Expand Down
5 changes: 1 addition & 4 deletions src/lightning_app/core/flow.py
Expand Up @@ -249,10 +249,7 @@ def __getattr__(self, item):

@property
def ready(self) -> bool:
"""Not currently enabled.

Override to customize when your App should be ready.
"""
"""Override to customize when your App should be ready."""
flows = self.flows
return all(flow.ready for flow in flows.values()) if flows else True

Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/core/work.py
Expand Up @@ -12,13 +12,13 @@
from lightning_app.storage.drive import _maybe_create_drive, Drive
from lightning_app.storage.payload import Payload
from lightning_app.utilities.app_helpers import _is_json_serializable, _LightningAppRef, is_overridden
from lightning_app.utilities.app_status import WorkStatus
from lightning_app.utilities.component import _is_flow_context, _sanitize_state
from lightning_app.utilities.enum import (
CacheCallsKeys,
make_status,
WorkFailureReasons,
WorkStageStatus,
WorkStatus,
WorkStopReasons,
)
from lightning_app.utilities.exceptions import LightningWorkException
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/runners/runtime.py
Expand Up @@ -121,7 +121,7 @@ def terminate(self) -> None:
self._add_stopped_status_to_work(work)

# Publish the updated state and wait for the frontend to update.
self.app.api_publish_state_queue.put(self.app.state)
self.app.api_publish_state_queue.put((self.app.state, self.app.status))

for thread in self.threads + self.app.threads:
thread.join(timeout=0)
Expand Down
29 changes: 29 additions & 0 deletions src/lightning_app/utilities/app_status.py
@@ -0,0 +1,29 @@
from datetime import datetime
from typing import Any, Dict, Optional

from pydantic import BaseModel


class WorkStatus(BaseModel):
"""The ``WorkStatus`` captures the status of a work according to the app."""

stage: str
timestamp: float
reason: Optional[str] = None
message: Optional[str] = None
count: int = 1

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)

assert self.timestamp > 0 and self.timestamp < (int(datetime.now().timestamp()) + 10)


class AppStatus(BaseModel):
"""The ``AppStatus`` captures the current status of the app and its components."""

# ``True`` when the app UI is ready to be viewed
is_ui_ready: bool

# The statuses of ``LightningWork`` objects currently associated with this app
work_statuses: Dict[str, WorkStatus]
13 changes: 0 additions & 13 deletions src/lightning_app/utilities/enum.py
@@ -1,5 +1,4 @@
import enum
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional

Expand Down Expand Up @@ -47,18 +46,6 @@ class WorkStageStatus:
FAILED = "failed"


@dataclass
class WorkStatus:
stage: WorkStageStatus
timestamp: float
reason: Optional[str] = None
message: Optional[str] = None
count: int = 1

def __post_init__(self):
assert self.timestamp > 0 and self.timestamp < (int(datetime.now().timestamp()) + 10)


def make_status(stage: str, message: Optional[str] = None, reason: Optional[str] = None):
status = {
"stage": stage,
Expand Down
8 changes: 6 additions & 2 deletions tests/tests_app/core/test_lightning_api.py
Expand Up @@ -31,6 +31,7 @@
from lightning_app.runners import MultiProcessRuntime
from lightning_app.storage.drive import Drive
from lightning_app.testing.helpers import _MockQueue
from lightning_app.utilities.app_status import AppStatus
from lightning_app.utilities.component import _set_frontend_context, _set_work_context
from lightning_app.utilities.enum import AppStage
from lightning_app.utilities.load_app import extract_metadata_from_app
Expand Down Expand Up @@ -195,7 +196,7 @@ def test_update_publish_state_and_maybe_refresh_ui():
publish_state_queue = _MockQueue("publish_state_queue")
api_response_queue = _MockQueue("api_response_queue")

publish_state_queue.put(app.state_with_changes)
publish_state_queue.put((app.state_with_changes, None))

thread = UIRefresher(publish_state_queue, api_response_queue)
thread.run_once()
Expand Down Expand Up @@ -226,7 +227,7 @@ def get(self, timeout: int = 0):
has_started_queue = _MockQueue("has_started_queue")
api_response_queue = _MockQueue("api_response_queue")
state = app.state_with_changes
publish_state_queue.put(state)
publish_state_queue.put((state, AppStatus(is_ui_ready=True, work_statuses={})))
spec = extract_metadata_from_app(app)
ui_refresher = start_server(
publish_state_queue,
Expand Down Expand Up @@ -284,6 +285,9 @@ def get(self, timeout: int = 0):
{"name": "main_4", "content": "https://te"},
]

response = await client.get("/api/v1/status")
assert response.json() == {"is_ui_ready": True, "work_statuses": {}}

response = await client.post("/api/v1/state", json={"state": new_state}, headers=headers)
assert change_state_queue._queue[1].to_dict() == {
"values_changed": {"root['vars']['counter']": {"new_value": 1}}
Expand Down
34 changes: 25 additions & 9 deletions tests/tests_app/core/test_lightning_flow.py
Expand Up @@ -5,7 +5,7 @@
from dataclasses import dataclass
from functools import partial
from time import time
from unittest.mock import ANY, MagicMock
from unittest.mock import ANY

import pytest
from deepdiff import DeepDiff, Delta
Expand All @@ -19,7 +19,7 @@
from lightning_app.storage.path import _storage_root_dir
from lightning_app.structures import Dict as LDict
from lightning_app.structures import List as LList
from lightning_app.testing.helpers import EmptyFlow, EmptyWork
from lightning_app.testing.helpers import _MockQueue, EmptyFlow, EmptyWork
from lightning_app.utilities.app_helpers import (
_delta_to_app_state_delta,
_LightningAppRef,
Expand Down Expand Up @@ -891,21 +891,37 @@ def run(self):


def test_flow_ready():
"""This test validates the api publish state queue is populated only once ready is True."""
"""This test validates that the app status queue is populated correctly."""

mock_queue = _MockQueue("api_publish_state_queue")

def run_patch(method):
app.api_publish_state_queue = MagicMock()
app.should_publish_changes_to_api = False
app.should_publish_changes_to_api = True
app.api_publish_state_queue = mock_queue
method()

state = {"done": False}

def lagged_run_once(method):
"""Ensure that the full loop is run after the app exits."""
new_done = method()
if state["done"]:
return True
state["done"] = new_done
return False

app = LightningApp(FlowReady())
app._run = partial(run_patch, method=app._run)
app.run_once = partial(lagged_run_once, method=app.run_once)
MultiProcessRuntime(app, start_server=False).dispatch()

# Validates the state has been added only when ready was true.
state = app.api_publish_state_queue.put._mock_call_args[0][0]
call_hash = state["works"]["w"]["calls"]["latest_call_hash"]
assert state["works"]["w"]["calls"][call_hash]["statuses"][0]["stage"] == "succeeded"
_, first_status = mock_queue.get()
assert not first_status.is_ui_ready

_, last_status = mock_queue.get()
while len(mock_queue) > 0:
_, last_status = mock_queue.get()
assert last_status.is_ui_ready


def test_structures_register_work_cloudcompute():
Expand Down