Skip to content

Commit

Permalink
[App] Remove SingleProcessRuntime (#15933)
Browse files Browse the repository at this point in the history
* Remove SingleProcessRuntime
* Remove unused queues
* Docs
  • Loading branch information
ethanwharris committed Dec 7, 2022
1 parent 06163e6 commit e250dfe
Show file tree
Hide file tree
Showing 21 changed files with 92 additions and 209 deletions.
1 change: 0 additions & 1 deletion docs/source-app/api_reference/runners.rst
Expand Up @@ -18,5 +18,4 @@ ______________
:template: classtemplate.rst

~cloud.CloudRuntime
~singleprocess.SingleProcessRuntime
~multiprocess.MultiProcessRuntime
1 change: 0 additions & 1 deletion docs/source-app/api_references.rst
Expand Up @@ -89,7 +89,6 @@ _______
:template: classtemplate_no_index.rst

~cloud.CloudRuntime
~singleprocess.SingleProcessRuntime
~multiprocess.MultiProcessRuntime

----
Expand Down
3 changes: 0 additions & 3 deletions docs/source-app/testing.rst
Expand Up @@ -120,7 +120,6 @@ We provide ``application_testing`` as a helper funtion to get your application u
os.path.join(_PROJECT_ROOT, "examples/app_v0/app.py"),
"--blocking",
"False",
"--multiprocess",
"--open-ui",
"False",
]
Expand All @@ -129,9 +128,7 @@ First in the list for ``command_line`` is the location of your script. It is an

Next there are a couple of options you can leverage:


* ``blocking`` - Blocking is an app status that says "Do not run until I click run in the UI". For our integration test, since we are not using the UI, we are setting this to "False".
* ``multiprocess/singleprocess`` - This is the runtime your app is expected to run under.
* ``open-ui`` - We set this to false since this is the routine that opens a browser for your local execution.

Once you have your commandline ready, you will then be able to kick off the test and gather results:
Expand Down
8 changes: 4 additions & 4 deletions examples/app_template_streamlit_ui/app.py
@@ -1,8 +1,8 @@
import logging

from lightning_app import LightningApp, LightningFlow
from lightning_app.frontend import StreamlitFrontend
from lightning_app.utilities.state import AppState
from lightning.app import LightningApp, LightningFlow
from lightning.app.frontend import StreamlitFrontend
from lightning.app.utilities.state import AppState

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,4 +45,4 @@ def configure_layout(self):
return [{"name": "StreamLitUI", "content": self.streamlit_ui}]


app = LightningApp(HelloWorld(), log_level="debug")
app = LightningApp(HelloWorld())
1 change: 0 additions & 1 deletion pyproject.toml
Expand Up @@ -102,7 +102,6 @@ module = [
"lightning_app.runners.cloud",
"lightning_app.runners.multiprocess",
"lightning_app.runners.runtime",
"lightning_app.runners.singleprocess",
"lightning_app.source_code.copytree",
"lightning_app.source_code.hashing",
"lightning_app.source_code.local",
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/CHANGELOG.md
Expand Up @@ -43,7 +43,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

### Removed

-
- Removed the `SingleProcessRuntime` ([#15933](https://github.com/Lightning-AI/lightning/pull/15933))


### Fixed
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/components/auto_scaler.py
Expand Up @@ -25,7 +25,6 @@
from lightning_app.utilities.packaging.cloud_compute import CloudCompute

logger = Logger(__name__)
lock = asyncio.Lock()


def _raise_granular_exception(exception: Exception) -> None:
Expand Down Expand Up @@ -209,6 +208,7 @@ async def process_request(self, data: BaseModel):
def run(self):

logger.info(f"servers: {self.servers}")
lock = asyncio.Lock()

self._iter = cycle(self.servers)
self._last_batch_sent = time.time()
Expand Down
4 changes: 1 addition & 3 deletions src/lightning_app/core/app.py
Expand Up @@ -21,7 +21,7 @@
FRONTEND_DIR,
STATE_ACCUMULATE_WAIT,
)
from lightning_app.core.queues import BaseQueue, SingleProcessQueue
from lightning_app.core.queues import BaseQueue
from lightning_app.core.work import LightningWork
from lightning_app.frontend import Frontend
from lightning_app.storage import Drive, Path, Payload
Expand Down Expand Up @@ -549,8 +549,6 @@ def _collect_work_finish_status(self) -> dict:
def _should_snapshot(self) -> bool:
if len(self.works) == 0:
return True
elif isinstance(self.delta_queue, SingleProcessQueue):
return True
elif self._has_updated:
work_finished_status = self._collect_work_finish_status()
if work_finished_status:
Expand Down
20 changes: 1 addition & 19 deletions src/lightning_app/core/queues.py
Expand Up @@ -49,7 +49,6 @@


class QueuingSystem(Enum):
SINGLEPROCESS = "singleprocess"
MULTIPROCESS = "multiprocess"
REDIS = "redis"
HTTP = "http"
Expand All @@ -59,10 +58,8 @@ def get_queue(self, queue_name: str) -> "BaseQueue":
return MultiProcessQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT)
elif self == QueuingSystem.REDIS:
return RedisQueue(queue_name, default_timeout=REDIS_QUEUES_READ_DEFAULT_TIMEOUT)
elif self == QueuingSystem.HTTP:
return HTTPQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT)
else:
return SingleProcessQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT)
return HTTPQueue(queue_name, default_timeout=STATE_UPDATE_TIMEOUT)

def get_api_response_queue(self, queue_id: Optional[str] = None) -> "BaseQueue":
queue_name = f"{queue_id}_{API_RESPONSE_QUEUE_CONSTANT}" if queue_id else API_RESPONSE_QUEUE_CONSTANT
Expand Down Expand Up @@ -179,21 +176,6 @@ def is_running(self) -> bool:
return True


class SingleProcessQueue(BaseQueue):
def __init__(self, name: str, default_timeout: float):
self.name = name
self.default_timeout = default_timeout
self.queue = queue.Queue()

def put(self, item):
self.queue.put(item)

def get(self, timeout: int = None):
if timeout == 0:
timeout = self.default_timeout
return self.queue.get(timeout=timeout, block=(timeout is None))


class MultiProcessQueue(BaseQueue):
def __init__(self, name: str, default_timeout: float):
self.name = name
Expand Down
2 changes: 0 additions & 2 deletions src/lightning_app/runners/__init__.py
@@ -1,7 +1,6 @@
from lightning_app.runners.cloud import CloudRuntime
from lightning_app.runners.multiprocess import MultiProcessRuntime
from lightning_app.runners.runtime import dispatch, Runtime
from lightning_app.runners.singleprocess import SingleProcessRuntime
from lightning_app.utilities.app_commands import run_app_commands
from lightning_app.utilities.load_app import load_app_from_file

Expand All @@ -11,6 +10,5 @@
"run_app_commands",
"Runtime",
"MultiProcessRuntime",
"SingleProcessRuntime",
"CloudRuntime",
]
7 changes: 2 additions & 5 deletions src/lightning_app/runners/runtime_type.py
@@ -1,21 +1,18 @@
from enum import Enum
from typing import Type, TYPE_CHECKING

from lightning_app.runners import CloudRuntime, MultiProcessRuntime, SingleProcessRuntime
from lightning_app.runners import CloudRuntime, MultiProcessRuntime

if TYPE_CHECKING:
from lightning_app.runners.runtime import Runtime


class RuntimeType(Enum):
SINGLEPROCESS = "singleprocess"
MULTIPROCESS = "multiprocess"
CLOUD = "cloud"

def get_runtime(self) -> Type["Runtime"]:
if self == RuntimeType.SINGLEPROCESS:
return SingleProcessRuntime
elif self == RuntimeType.MULTIPROCESS:
if self == RuntimeType.MULTIPROCESS:
return MultiProcessRuntime
elif self == RuntimeType.CLOUD:
return CloudRuntime
Expand Down
62 changes: 0 additions & 62 deletions src/lightning_app/runners/singleprocess.py

This file was deleted.

7 changes: 0 additions & 7 deletions src/lightning_app/utilities/app_helpers.py
Expand Up @@ -130,13 +130,6 @@ def set_served_session_id(self, k, v):
self.store[k].session_id = v


class DistributedMode(enum.Enum):
SINGLEPROCESS = enum.auto()
MULTIPROCESS = enum.auto()
CONTAINER = enum.auto()
GRID = enum.auto()


class _LightningAppRef:
_app_instance: Optional["LightningApp"] = None

Expand Down
29 changes: 20 additions & 9 deletions src/lightning_app/utilities/state.py
Expand Up @@ -2,6 +2,7 @@
import json
import os
from copy import deepcopy
from time import sleep
from typing import Any, Dict, List, Optional, Tuple, Union

from deepdiff import DeepDiff
Expand Down Expand Up @@ -149,16 +150,26 @@ def _request_state(self) -> None:
return
app_url = f"{self._url}/api/v1/state"
headers = headers_for(self._plugin.get_context()) if self._plugin else {}
try:
response = self._session.get(app_url, headers=headers, timeout=1)
except ConnectionError as e:
raise AttributeError("Failed to connect and fetch the app state. Is the app running?") from e

self._authorized = response.status_code
if self._authorized != 200:
return
logger.debug(f"GET STATE {response} {response.json()}")
self._store_state(response.json())
response_json = {}

# Sometimes the state URL can return an empty JSON when things are being set-up,
# so we wait for it to be ready here.
while response_json == {}:
sleep(0.5)
try:
response = self._session.get(app_url, headers=headers, timeout=1)
except ConnectionError as e:
raise AttributeError("Failed to connect and fetch the app state. Is the app running?") from e

self._authorized = response.status_code
if self._authorized != 200:
return

response_json = response.json()

logger.debug(f"GET STATE {response} {response_json}")
self._store_state(response_json)

def __getattr__(self, name: str) -> Union[Any, "AppState"]:
if name in self._APP_PRIVATE_KEYS:
Expand Down
20 changes: 7 additions & 13 deletions tests/tests_app/core/test_lightning_api.py
Expand Up @@ -28,7 +28,7 @@
UIRefresher,
)
from lightning_app.core.constants import APP_SERVER_PORT
from lightning_app.runners import MultiProcessRuntime, SingleProcessRuntime
from lightning_app.runners import MultiProcessRuntime
from lightning_app.storage.drive import Drive
from lightning_app.testing.helpers import _MockQueue
from lightning_app.utilities.component import _set_frontend_context, _set_work_context
Expand Down Expand Up @@ -71,12 +71,10 @@ def run(self):
self.work_a.run()


# TODO: Resolve singleprocess - idea: explore frame calls recursively.
@pytest.mark.parametrize("runtime_cls", [MultiProcessRuntime])
def test_app_state_api(runtime_cls):
def test_app_state_api():
"""This test validates the AppState can properly broadcast changes from work within its own process."""
app = LightningApp(_A(), log_level="debug")
runtime_cls(app, start_server=True).dispatch()
MultiProcessRuntime(app, start_server=True).dispatch()
assert app.root.work_a.var_a == -1
_set_work_context()
assert app.root.work_a.drive.list(".") == ["test_app_state_api.txt"]
Expand Down Expand Up @@ -105,13 +103,10 @@ def run(self):
self._exit()


# TODO: Find why this test is flaky.
@pytest.mark.skip(reason="flaky test.")
@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime])
def test_app_state_api_with_flows(runtime_cls, tmpdir):
def test_app_state_api_with_flows(tmpdir):
"""This test validates the AppState can properly broadcast changes from flows."""
app = LightningApp(A2(), log_level="debug")
runtime_cls(app, start_server=True).dispatch()
MultiProcessRuntime(app, start_server=True).dispatch()
assert app.root.var_a == -1


Expand Down Expand Up @@ -181,13 +176,12 @@ def maybe_apply_changes(self):

# FIXME: This test doesn't assert anything
@pytest.mark.skip(reason="TODO: Resolve flaky test.")
@pytest.mark.parametrize("runtime_cls", [SingleProcessRuntime, MultiProcessRuntime])
def test_app_stage_from_frontend(runtime_cls):
def test_app_stage_from_frontend():
"""This test validates that delta from the `api_delta_queue` manipulating the ['app_state']['stage'] would
start and stop the app."""
app = AppStageTestingApp(FlowA(), log_level="debug")
app.stage = AppStage.BLOCKING
runtime_cls(app, start_server=True).dispatch()
MultiProcessRuntime(app, start_server=True).dispatch()


def test_update_publish_state_and_maybe_refresh_ui():
Expand Down

0 comments on commit e250dfe

Please sign in to comment.