diff --git a/.github/workflows/ci-lite-tests.yml b/.github/workflows/ci-lite-tests.yml deleted file mode 100644 index bc25b4982743d9..00000000000000 --- a/.github/workflows/ci-lite-tests.yml +++ /dev/null @@ -1,159 +0,0 @@ -name: Test Lite - -# see: https://help.github.com/en/actions/reference/events-that-trigger-workflows -on: - push: - branches: [master, "release/*"] - pull_request: - branches: [master, "release/*"] - types: [opened, reopened, ready_for_review, synchronize] # added `ready_for_review` since draft is skipped - paths: - - ".actions/**" - - "requirements/lite/**" - - "src/lightning_lite/**" - - "tests/tests_lite/**" - - "setup.cfg" # includes pytest config - - ".github/workflows/ci-lite-tests.yml" - - "!requirements/*/docs.txt" - - "!*.md" - - "!**/*.md" - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }}-${{ github.head_ref }} - cancel-in-progress: ${{ ! (github.ref == 'refs/heads/master' || startsWith(github.ref, 'refs/heads/release/')) }} - -env: - FREEZE_REQUIREMENTS: 1 - -defaults: - run: - shell: bash - -jobs: - lite-cpu: - runs-on: ${{ matrix.os }} - if: github.event.pull_request.draft == false - strategy: - fail-fast: false - matrix: - include: - # assign python and pytorch version combinations to operating systems (arbitrarily) - # note: there's no distribution of torch==1.10 for Python>=3.10 - - {os: "macOS-11", pkg-name: "lite", python-version: "3.8", pytorch-version: "1.11"} - - {os: "macOS-11", pkg-name: "lite", python-version: "3.9", pytorch-version: "1.12"} - - {os: "ubuntu-20.04", pkg-name: "lite", python-version: "3.8", pytorch-version: "1.10"} - - {os: "ubuntu-20.04", pkg-name: "lite", python-version: "3.9", pytorch-version: "1.11"} - - {os: "ubuntu-20.04", pkg-name: "lite", python-version: "3.10", pytorch-version: "1.12"} - - {os: "windows-2022", pkg-name: "lite", python-version: "3.9", pytorch-version: "1.11"} - - {os: "windows-2022", pkg-name: "lite", python-version: "3.10", pytorch-version: "1.12"} - # only run PyTorch latest with Python latest - - {os: "macOS-11", pkg-name: "lite", python-version: "3.10", pytorch-version: "1.13"} - - {os: "ubuntu-20.04", pkg-name: "lite", python-version: "3.10", pytorch-version: "1.13"} - - {os: "windows-2022", pkg-name: "lite", python-version: "3.10", pytorch-version: "1.13"} - # "oldest" versions tests, only on minimum Python - - {os: "macOS-11", pkg-name: "lite", python-version: "3.7", pytorch-version: "1.10", requires: "oldest"} - - {os: "ubuntu-20.04", pkg-name: "lite", python-version: "3.7", pytorch-version: "1.10", requires: "oldest"} - - {os: "windows-2022", pkg-name: "lite", python-version: "3.7", pytorch-version: "1.10", requires: "oldest"} - # "lightning" installs the monolithic package - - {os: "macOS-11", pkg-name: "lightning", python-version: "3.8", pytorch-version: "1.13"} - - {os: "ubuntu-20.04", pkg-name: "lightning", python-version: "3.8", pytorch-version: "1.13"} - - {os: "windows-2022", pkg-name: "lightning", python-version: "3.8", pytorch-version: "1.13"} - - timeout-minutes: 15 - - steps: - - uses: actions/checkout@v3 - - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 - with: - python-version: ${{ matrix.python-version }} - - - name: basic setup - run: | - pip --version - pip install -q -r .actions/requirements.txt - - - name: Setup Windows - if: runner.os == 'windows' - run: | - python .actions/assistant.py requirements_prune_pkgs "--packages=[horovod]" - - - name: Set min. dependencies - if: ${{ matrix.requires == 'oldest' }} - run: | - python .actions/assistant.py replace_oldest_ver - - - name: Adjust PyTorch versions in requirements files - if: ${{ matrix.requires != 'oldest' }} - run: | - python ./requirements/pytorch/adjust-versions.py requirements/lite/base.txt ${{ matrix.pytorch-version }} - cat requirements/lite/base.txt - - - name: Get pip cache dir - id: pip-cache - run: echo "dir=$(pip cache dir)" >> $GITHUB_OUTPUT - - - name: pip cache - uses: actions/cache@v3 - with: - path: ${{ steps.pip-cache.outputs.dir }} - key: ${{ runner.os }}-pip-py${{ matrix.python-version }}-${{ matrix.pkg-name }}-${{ matrix.release }}-${{ matrix.requires }}-${{ hashFiles('requirements/lite/*.txt') }} - restore-keys: | - ${{ runner.os }}-pip-py${{ matrix.python-version }}-${{ matrix.pkg-name }}-${{ matrix.release }}-${{ matrix.requires }}- - - - name: Switch PyTorch URL - run: python -c "print('TORCH_URL=https://download.pytorch.org/whl/' + str('test/cpu/torch_test.html' if '${{matrix.release}}' == 'pre' else 'cpu/torch_stable.html'))" >> $GITHUB_ENV - - - name: Install package & dependencies - env: - PACKAGE_NAME: ${{ matrix.pkg-name }} - run: | - pip install -e . "pytest-timeout" -r requirements/lite/devel.txt --upgrade --find-links ${TORCH_URL} - pip list - - - name: Adjust tests - if: ${{ matrix.pkg-name == 'lightning' }} - run: | - python .actions/assistant.py copy_replace_imports --source_dir="./tests" \ - --source_import="lightning_lite" --target_import="lightning.lite" - - - name: Testing Warnings - # the stacklevel can only be set on >=3.7 - if: matrix.python-version != '3.7' - working-directory: tests/tests_lite - # needs to run outside of `pytest` - run: python utilities/test_warnings.py - - - name: Switch coverage scope - run: python -c "print('COVERAGE_SCOPE=' + str('lightning' if '${{matrix.pkg-name}}' == 'lightning' else 'lightning_lite'))" >> $GITHUB_ENV - - - name: Testing Lite - working-directory: tests/tests_lite - # NOTE: do not include coverage report here, see: https://github.com/nedbat/coveragepy/issues/1003 - run: coverage run --source ${COVERAGE_SCOPE} -m pytest -v --timeout=30 --durations=50 --junitxml=results-${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.requires }}-${{ matrix.release }}.xml - - - name: Upload pytest results - if: failure() - uses: actions/upload-artifact@v3 - with: - name: unittest-results-${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.requires }}-${{ matrix.release }} - path: tests/tests_lite/results-${{ runner.os }}-py${{ matrix.python-version }}-${{ matrix.requires }}-${{ matrix.release }}.xml - - - name: Statistics - if: success() - working-directory: tests/tests_lite - run: | - coverage report - coverage xml - - - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 - # see: https://github.com/actions/toolkit/issues/399 - continue-on-error: true - with: - token: ${{ secrets.CODECOV_TOKEN }} - file: tests/tests_lite/coverage.xml - flags: ${COVERAGE_SCOPE},cpu,pytest,python${{ matrix.python-version }} - name: CPU-coverage - fail_ci_if_error: false diff --git a/.github/workflows/ci-tests-pytorch.yml b/.github/workflows/ci-tests-pytorch.yml index 43e3b41a3d9f36..2239154399874c 100644 --- a/.github/workflows/ci-tests-pytorch.yml +++ b/.github/workflows/ci-tests-pytorch.yml @@ -3,9 +3,9 @@ name: Test PyTorch # see: https://help.github.com/en/actions/reference/events-that-trigger-workflows on: push: - branches: [master, "release/*", "lite/debug"] + branches: [master, "release/*"] pull_request: - branches: [master, "release/*", "lite/debug"] + branches: [master, "release/*"] types: [opened, reopened, ready_for_review, synchronize] # added `ready_for_review` since draft is skipped paths: - ".actions/**" @@ -174,8 +174,8 @@ jobs: if: ${{ matrix.pkg-name == 'lightning' }} run: | python .actions/assistant.py copy_replace_imports --source_dir="./tests" \ - --source_import="pytorch_lightning,lightning_fabric" \ - --target_import="lightning.pytorch,lightning.fabric" + --source_import="pytorch_lightning,lightning_lite" \ + --target_import="lightning.pytorch,lightning.lite" - name: Testing Warnings # the stacklevel can only be set on >=3.7 diff --git a/pyproject.toml b/pyproject.toml index c8f35d75da3556..782ba81779de35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,7 +81,6 @@ module = [ "lightning_app.components.serve.python_server", "lightning_app.components.serve.auto_scaler", "lightning_app.components.training", - "lightning_app.components.auto_scaler", "lightning_app.core.api", "lightning_app.core.app", "lightning_app.core.flow", diff --git a/src/lightning_app/CHANGELOG.md b/src/lightning_app/CHANGELOG.md index 1629b745ea4f31..83dad82cef8b9a 100644 --- a/src/lightning_app/CHANGELOG.md +++ b/src/lightning_app/CHANGELOG.md @@ -18,6 +18,18 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Added automatic conversion of list and dict of works and flows to structures ([#15961](https://github.com/Lightning-AI/lightning/pull/15961)) +- Added partial support for fastapi `Request` annotation in `configure_api` handlers ([#16047](https://github.com/Lightning-AI/lightning/pull/16047)) + +- Added a nicer UI with URL and examples for the autoscaler component ([#16063](https://github.com/Lightning-AI/lightning/pull/16063)) + +- Enabled users to have more control over scaling out/in interval ([#16093](https://github.com/Lightning-AI/lightning/pull/16093)) + +- Added more datatypes to serving component ([#16018](https://github.com/Lightning-AI/lightning/pull/16018)) + +- Added `work.delete` method to delete the work ([#16103](https://github.com/Lightning-AI/lightning/pull/16103)) + +- Added `display_name` property to LightningWork for the cloud ([#16095](https://github.com/Lightning-AI/lightning/pull/16095)) + ### Changed @@ -39,6 +51,14 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - +- Fixed the debugger detection mechanism for lightning App in VSCode ([#16068](https://github.com/Lightning-AI/lightning/pull/16068)) + + +- Fixed bug where components that are re-instantiated several times failed to initialize if they were modifying `self.lightningignore` ([#16080](https://github.com/Lightning-AI/lightning/pull/16080)) + + +- Fixed a bug where apps that had previously been deleted could not be run again from the CLI ([#16082](https://github.com/Lightning-AI/lightning/pull/16082)) + - Fixed MPS error for multinode component (defaults to cpu on mps devices now as distributed operations are not supported by pytorch on mps) ([#15748](https://github.com/Ligtning-AI/lightning/pull/15748)) ## [1.8.6] - 2022-12-21 diff --git a/src/lightning_app/components/__init__.py b/src/lightning_app/components/__init__.py index 25fb86f0233ae8..0275596288ff06 100644 --- a/src/lightning_app/components/__init__.py +++ b/src/lightning_app/components/__init__.py @@ -1,4 +1,3 @@ -from lightning_app.components.auto_scaler import AutoScaler from lightning_app.components.database.client import DatabaseClient from lightning_app.components.database.server import Database from lightning_app.components.multi_node import ( diff --git a/src/lightning_app/components/auto_scaler.py b/src/lightning_app/components/auto_scaler.py deleted file mode 100644 index 13948ba50af892..00000000000000 --- a/src/lightning_app/components/auto_scaler.py +++ /dev/null @@ -1,578 +0,0 @@ -import asyncio -import logging -import os -import secrets -import time -import uuid -from base64 import b64encode -from itertools import cycle -from typing import Any, Dict, List, Tuple, Type - -import requests -import uvicorn -from fastapi import Depends, FastAPI, HTTPException, Request -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import RedirectResponse -from fastapi.security import HTTPBasic, HTTPBasicCredentials -from pydantic import BaseModel -from starlette.status import HTTP_401_UNAUTHORIZED - -from lightning_app.core.flow import LightningFlow -from lightning_app.core.work import LightningWork -from lightning_app.utilities.app_helpers import Logger -from lightning_app.utilities.imports import _is_aiohttp_available, requires -from lightning_app.utilities.packaging.cloud_compute import CloudCompute - -if _is_aiohttp_available(): - import aiohttp - import aiohttp.client_exceptions - -logger = Logger(__name__) - - -def _raise_granular_exception(exception: Exception) -> None: - """Handle an exception from hitting the model servers.""" - if not isinstance(exception, Exception): - return - - if isinstance(exception, HTTPException): - raise exception - - if isinstance(exception, aiohttp.client_exceptions.ServerDisconnectedError): - raise HTTPException(500, "Worker Server Disconnected") from exception - - if isinstance(exception, aiohttp.client_exceptions.ClientError): - logging.exception(exception) - raise HTTPException(500, "Worker Server error") from exception - - if isinstance(exception, asyncio.TimeoutError): - raise HTTPException(408, "Request timed out") from exception - - if isinstance(exception, Exception): - if exception.args[0] == "Server disconnected": - raise HTTPException(500, "Worker Server disconnected") from exception - - logging.exception(exception) - raise HTTPException(500, exception.args[0]) from exception - - -class _SysInfo(BaseModel): - num_workers: int - servers: List[str] - num_requests: int - processing_time: int - global_request_count: int - - -class _BatchRequestModel(BaseModel): - inputs: List[Any] - - -def _create_fastapi(title: str) -> FastAPI: - fastapi_app = FastAPI(title=title) - - fastapi_app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) - - fastapi_app.global_request_count = 0 - fastapi_app.num_current_requests = 0 - fastapi_app.last_processing_time = 0 - - @fastapi_app.get("/", include_in_schema=False) - async def docs(): - return RedirectResponse("/docs") - - @fastapi_app.get("/num-requests") - async def num_requests() -> int: - return fastapi_app.num_current_requests - - return fastapi_app - - -class _LoadBalancer(LightningWork): - r"""The LoadBalancer is a LightningWork component that collects the requests and sends them to the prediciton API - asynchronously using RoundRobin scheduling. It also performs auto batching of the incoming requests. - - The LoadBalancer exposes system endpoints with a basic HTTP authentication, in order to activate the authentication - you need to provide a system password from environment variable:: - - lightning run app app.py --env AUTO_SCALER_AUTH_PASSWORD=PASSWORD - - After enabling you will require to send username and password from the request header for the private endpoints. - - Args: - input_type: Input type. - output_type: Output type. - endpoint: The REST API path. - max_batch_size: The number of requests processed at once. - timeout_batching: The number of seconds to wait before sending the requests to process in order to allow for - requests to be batched. In any case, requests are processed as soon as `max_batch_size` is reached. - timeout_keep_alive: The number of seconds until it closes Keep-Alive connections if no new data is received. - timeout_inference_request: The number of seconds to wait for inference. - \**kwargs: Arguments passed to :func:`LightningWork.init` like ``CloudCompute``, ``BuildConfig``, etc. - """ - - @requires(["aiohttp"]) - def __init__( - self, - input_type: BaseModel, - output_type: BaseModel, - endpoint: str, - max_batch_size: int = 8, - # all timeout args are in seconds - timeout_batching: int = 1, - timeout_keep_alive: int = 60, - timeout_inference_request: int = 60, - **kwargs: Any, - ) -> None: - super().__init__(cloud_compute=CloudCompute("default"), **kwargs) - self._input_type = input_type - self._output_type = output_type - self._timeout_keep_alive = timeout_keep_alive - self._timeout_inference_request = timeout_inference_request - self.servers = [] - self.max_batch_size = max_batch_size - self.timeout_batching = timeout_batching - self._iter = None - self._batch = [] - self._responses = {} # {request_id: response} - self._last_batch_sent = 0 - - if not endpoint.startswith("/"): - endpoint = "/" + endpoint - - self.endpoint = endpoint - - async def send_batch(self, batch: List[Tuple[str, _BatchRequestModel]]): - server = next(self._iter) # round-robin - request_data: List[_LoadBalancer._input_type] = [b[1] for b in batch] - batch_request_data = _BatchRequestModel(inputs=request_data) - - try: - async with aiohttp.ClientSession() as session: - headers = { - "accept": "application/json", - "Content-Type": "application/json", - } - async with session.post( - f"{server}{self.endpoint}", - json=batch_request_data.dict(), - timeout=self._timeout_inference_request, - headers=headers, - ) as response: - if response.status == 408: - raise HTTPException(408, "Request timed out") - response.raise_for_status() - response = await response.json() - outputs = response["outputs"] - if len(batch) != len(outputs): - raise RuntimeError(f"result has {len(outputs)} items but batch is {len(batch)}") - result = {request[0]: r for request, r in zip(batch, outputs)} - self._responses.update(result) - except Exception as ex: - result = {request[0]: ex for request in batch} - self._responses.update(result) - - async def consumer(self): - while True: - await asyncio.sleep(0.05) - - batch = self._batch[: self.max_batch_size] - while batch and ( - (len(batch) == self.max_batch_size) or ((time.time() - self._last_batch_sent) > self.timeout_batching) - ): - asyncio.create_task(self.send_batch(batch)) - - self._batch = self._batch[self.max_batch_size :] - batch = self._batch[: self.max_batch_size] - self._last_batch_sent = time.time() - - async def process_request(self, data: BaseModel): - if not self.servers: - raise HTTPException(500, "None of the workers are healthy!") - - request_id = uuid.uuid4().hex - request: Tuple = (request_id, data) - self._batch.append(request) - - while True: - await asyncio.sleep(0.05) - - if request_id in self._responses: - result = self._responses[request_id] - del self._responses[request_id] - _raise_granular_exception(result) - return result - - def run(self): - logger.info(f"servers: {self.servers}") - lock = asyncio.Lock() - - self._iter = cycle(self.servers) - self._last_batch_sent = time.time() - - fastapi_app = _create_fastapi("Load Balancer") - security = HTTPBasic() - fastapi_app.SEND_TASK = None - - @fastapi_app.middleware("http") - async def current_request_counter(request: Request, call_next): - if not request.scope["path"] == self.endpoint: - return await call_next(request) - fastapi_app.global_request_count += 1 - fastapi_app.num_current_requests += 1 - start_time = time.time() - response = await call_next(request) - processing_time = time.time() - start_time - fastapi_app.last_processing_time = processing_time - fastapi_app.num_current_requests -= 1 - return response - - @fastapi_app.on_event("startup") - async def startup_event(): - fastapi_app.SEND_TASK = asyncio.create_task(self.consumer()) - - @fastapi_app.on_event("shutdown") - def shutdown_event(): - fastapi_app.SEND_TASK.cancel() - - def authenticate_private_endpoint(credentials: HTTPBasicCredentials = Depends(security)): - AUTO_SCALER_AUTH_PASSWORD = os.environ.get("AUTO_SCALER_AUTH_PASSWORD", "") - if len(AUTO_SCALER_AUTH_PASSWORD) == 0: - logger.warn( - "You have not set a password for private endpoints! To set a password, add " - "`--env AUTO_SCALER_AUTH_PASSWORD=` to your lightning run command." - ) - current_password_bytes = credentials.password.encode("utf8") - is_correct_password = secrets.compare_digest( - current_password_bytes, AUTO_SCALER_AUTH_PASSWORD.encode("utf8") - ) - if not is_correct_password: - raise HTTPException( - status_code=401, - detail="Incorrect password", - headers={"WWW-Authenticate": "Basic"}, - ) - return True - - @fastapi_app.get("/system/info", response_model=_SysInfo) - async def sys_info(authenticated: bool = Depends(authenticate_private_endpoint)): - return _SysInfo( - num_workers=len(self.servers), - servers=self.servers, - num_requests=fastapi_app.num_current_requests, - processing_time=fastapi_app.last_processing_time, - global_request_count=fastapi_app.global_request_count, - ) - - @fastapi_app.put("/system/update-servers") - async def update_servers(servers: List[str], authenticated: bool = Depends(authenticate_private_endpoint)): - async with lock: - self.servers = servers - self._iter = cycle(self.servers) - - @fastapi_app.post(self.endpoint, response_model=self._output_type) - async def balance_api(inputs: self._input_type): - return await self.process_request(inputs) - - uvicorn.run( - fastapi_app, - host=self.host, - port=self.port, - loop="uvloop", - timeout_keep_alive=self._timeout_keep_alive, - access_log=False, - ) - - def update_servers(self, server_works: List[LightningWork]): - """Updates works that load balancer distributes requests to. - - AutoScaler uses this method to increase/decrease the number of works. - """ - old_servers = set(self.servers) - server_urls: List[str] = [server.url for server in server_works if server.url] - new_servers = set(server_urls) - - if new_servers == old_servers: - return - - if new_servers - old_servers: - logger.info(f"servers added: {new_servers - old_servers}") - - deleted_servers = old_servers - new_servers - if deleted_servers: - logger.info(f"servers deleted: {deleted_servers}") - - self.send_request_to_update_servers(server_urls) - - def send_request_to_update_servers(self, servers: List[str]): - AUTHORIZATION_TYPE = "Basic" - USERNAME = "lightning" - AUTO_SCALER_AUTH_PASSWORD = os.environ.get("AUTO_SCALER_AUTH_PASSWORD", "") - - try: - param = f"{USERNAME}:{AUTO_SCALER_AUTH_PASSWORD}".encode() - data = b64encode(param).decode("utf-8") - except (ValueError, UnicodeDecodeError) as e: - raise HTTPException( - status_code=HTTP_401_UNAUTHORIZED, - detail="Invalid authentication credentials", - headers={"WWW-Authenticate": "Basic"}, - ) from e - headers = { - "accept": "application/json", - "username": USERNAME, - "Authorization": AUTHORIZATION_TYPE + " " + data, - } - response = requests.put(f"{self.url}/system/update-servers", json=servers, headers=headers, timeout=10) - response.raise_for_status() - - -class AutoScaler(LightningFlow): - """The ``AutoScaler`` can be used to automatically change the number of replicas of the given server in - response to changes in the number of incoming requests. Incoming requests will be batched and balanced across - the replicas. - - Args: - min_replicas: The number of works to start when app initializes. - max_replicas: The max number of works to spawn to handle the incoming requests. - autoscale_interval: The number of seconds to wait before checking whether to upscale or downscale the works. - endpoint: Provide the REST API path. - max_batch_size: (auto-batching) The number of requests to process at once. - timeout_batching: (auto-batching) The number of seconds to wait before sending the requests to process. - input_type: Input type. - output_type: Output type. - - .. testcode:: - - import lightning as L - - # Example 1: Auto-scaling serve component out-of-the-box - app = L.LightningApp( - L.app.components.AutoScaler( - MyPythonServer, - min_replicas=1, - max_replicas=8, - autoscale_interval=10, - ) - ) - - # Example 2: Customizing the scaling logic - class MyAutoScaler(L.app.components.AutoScaler): - def scale(self, replicas: int, metrics: dict) -> int: - pending_requests_per_running_or_pending_work = metrics["pending_requests"] / ( - replicas + metrics["pending_works"] - ) - - # upscale - max_requests_per_work = self.max_batch_size - if pending_requests_per_running_or_pending_work >= max_requests_per_work: - return replicas + 1 - - # downscale - min_requests_per_work = max_requests_per_work * 0.25 - if pending_requests_per_running_or_pending_work < min_requests_per_work: - return replicas - 1 - - return replicas - - - app = L.LightningApp( - MyAutoScaler( - MyPythonServer, - min_replicas=1, - max_replicas=8, - autoscale_interval=10, - max_batch_size=8, # for auto batching - timeout_batching=1, # for auto batching - ) - ) - """ - - def __init__( - self, - work_cls: Type[LightningWork], - min_replicas: int = 1, - max_replicas: int = 4, - autoscale_interval: int = 10, - max_batch_size: int = 8, - timeout_batching: float = 1, - endpoint: str = "api/predict", - input_type: BaseModel = Dict, - output_type: BaseModel = Dict, - *work_args: Any, - **work_kwargs: Any, - ) -> None: - super().__init__() - self.num_replicas = 0 - self._work_registry = {} - - self._work_cls = work_cls - self._work_args = work_args - self._work_kwargs = work_kwargs - - self._input_type = input_type - self._output_type = output_type - self.autoscale_interval = autoscale_interval - self.max_batch_size = max_batch_size - - if max_replicas < min_replicas: - raise ValueError( - f"`max_replicas={max_replicas}` must be less than or equal to `min_replicas={min_replicas}`." - ) - self.max_replicas = max_replicas - self.min_replicas = min_replicas - self._last_autoscale = time.time() - self.fake_trigger = 0 - - self.load_balancer = _LoadBalancer( - input_type=self._input_type, - output_type=self._output_type, - endpoint=endpoint, - max_batch_size=max_batch_size, - timeout_batching=timeout_batching, - cache_calls=True, - parallel=True, - ) - for _ in range(min_replicas): - work = self.create_work() - self.add_work(work) - - @property - def workers(self) -> List[LightningWork]: - return [self.get_work(i) for i in range(self.num_replicas)] - - def create_work(self) -> LightningWork: - """Replicates a LightningWork instance with args and kwargs provided via ``__init__``.""" - cloud_compute = self._work_kwargs.get("cloud_compute", None) - self._work_kwargs.update( - dict( - # TODO: Remove `start_with_flow=False` for faster initialization on the cloud - start_with_flow=False, - # don't try to create multiple works in a single machine - cloud_compute=cloud_compute.clone() if cloud_compute else None, - ) - ) - return self._work_cls(*self._work_args, **self._work_kwargs) - - def add_work(self, work) -> str: - """Adds a new LightningWork instance. - - Returns: - The name of the new work attribute. - """ - work_attribute = uuid.uuid4().hex - work_attribute = f"worker_{self.num_replicas}_{str(work_attribute)}" - setattr(self, work_attribute, work) - self._work_registry[self.num_replicas] = work_attribute - self.num_replicas += 1 - return work_attribute - - def remove_work(self, index: int) -> str: - """Removes the ``index`` th LightningWork instance.""" - work_attribute = self._work_registry[index] - del self._work_registry[index] - work = getattr(self, work_attribute) - work.stop() - self.num_replicas -= 1 - return work_attribute - - def get_work(self, index: int) -> LightningWork: - """Returns the ``LightningWork`` instance with the given index.""" - work_attribute = self._work_registry[index] - work = getattr(self, work_attribute) - return work - - def run(self): - if not self.load_balancer.is_running: - self.load_balancer.run() - - for work in self.workers: - work.run() - - if self.load_balancer.url: - self.fake_trigger += 1 # Note: change state to keep calling `run`. - self.autoscale() - - def scale(self, replicas: int, metrics: dict) -> int: - """The default scaling logic that users can override. - - Args: - replicas: The number of running works. - metrics: ``metrics['pending_requests']`` is the total number of requests that are currently pending. - ``metrics['pending_works']`` is the number of pending works. - - Returns: - The target number of running works. The value will be adjusted after this method runs - so that it satisfies ``min_replicas<=replicas<=max_replicas``. - """ - pending_requests_per_running_or_pending_work = metrics["pending_requests"] / ( - replicas + metrics["pending_works"] - ) - - # scale out if the number of pending requests exceeds max batch size. - max_requests_per_work = self.max_batch_size - if pending_requests_per_running_or_pending_work >= max_requests_per_work: - return replicas + 1 - - # scale in if the number of pending requests is below 25% of max_requests_per_work - min_requests_per_work = max_requests_per_work * 0.25 - if pending_requests_per_running_or_pending_work < min_requests_per_work: - return replicas - 1 - - return replicas - - @property - def num_pending_requests(self) -> int: - """Fetches the number of pending requests via load balancer.""" - return int(requests.get(f"{self.load_balancer.url}/num-requests").json()) - - @property - def num_pending_works(self) -> int: - """The number of pending works.""" - return sum(work.is_pending for work in self.workers) - - def autoscale(self) -> None: - """Adjust the number of works based on the target number returned by ``self.scale``.""" - if time.time() - self._last_autoscale < self.autoscale_interval: - return - - self.load_balancer.update_servers(self.workers) - - metrics = { - "pending_requests": self.num_pending_requests, - "pending_works": self.num_pending_works, - } - - # ensure min_replicas <= num_replicas <= max_replicas - num_target_workers = max( - self.min_replicas, - min(self.max_replicas, self.scale(self.num_replicas, metrics)), - ) - - # upscale - num_workers_to_add = num_target_workers - self.num_replicas - for _ in range(num_workers_to_add): - logger.info(f"Upscaling from {self.num_replicas} to {self.num_replicas + 1}") - work = self.create_work() - new_work_id = self.add_work(work) - logger.info(f"Work created: '{new_work_id}'") - - # downscale - num_workers_to_remove = self.num_replicas - num_target_workers - for _ in range(num_workers_to_remove): - logger.info(f"Downscaling from {self.num_replicas} to {self.num_replicas - 1}") - removed_work_id = self.remove_work(self.num_replicas - 1) - logger.info(f"Work removed: '{removed_work_id}'") - - self.load_balancer.update_servers(self.workers) - self._last_autoscale = time.time() - - def configure_layout(self): - tabs = [{"name": "Swagger", "content": self.load_balancer.url}] - return tabs diff --git a/src/lightning_app/core/flow.py b/src/lightning_app/core/flow.py index 4b2c54b17f96f0..ee2931a6afadb7 100644 --- a/src/lightning_app/core/flow.py +++ b/src/lightning_app/core/flow.py @@ -10,13 +10,7 @@ from lightning_app.frontend import Frontend from lightning_app.storage import Path from lightning_app.storage.drive import _maybe_create_drive, Drive -from lightning_app.utilities.app_helpers import ( - _is_json_serializable, - _lightning_dispatched, - _LightningAppRef, - _set_child_name, - is_overridden, -) +from lightning_app.utilities.app_helpers import _is_json_serializable, _LightningAppRef, _set_child_name, is_overridden from lightning_app.utilities.component import _sanitize_state from lightning_app.utilities.exceptions import ExitAppException from lightning_app.utilities.introspection import _is_init_context, _is_run_context @@ -255,10 +249,7 @@ def __getattr__(self, item): @property def ready(self) -> bool: - """Not currently enabled. - - Override to customize when your App should be ready. - """ + """Override to customize when your App should be ready.""" flows = self.flows return all(flow.ready for flow in flows.values()) if flows else True diff --git a/src/lightning_app/utilities/frontend.py b/src/lightning_app/utilities/frontend.py index 470036436a63cf..afc5f215398623 100644 --- a/src/lightning_app/utilities/frontend.py +++ b/src/lightning_app/utilities/frontend.py @@ -22,11 +22,12 @@ def update_index_file(ui_root: str, info: Optional[AppInfo] = None, root_path: s entry_file = Path(ui_root) / "index.html" original_file = Path(ui_root) / "index.original.html" - if not original_file.exists(): - shutil.copyfile(entry_file, original_file) # keep backup - else: - # revert index.html in case it was modified after creating original.html - shutil.copyfile(original_file, entry_file) + if root_path: + if not original_file.exists(): + shutil.copyfile(entry_file, original_file) # keep backup + else: + # revert index.html in case it was modified after creating original.html + shutil.copyfile(original_file, entry_file) if info: with original_file.open() as f: diff --git a/tests/tests_app/cli/test_cloud_cli.py b/tests/tests_app/cli/test_cloud_cli.py index a66ad518d5b4b5..3b9317a3a9613b 100644 --- a/tests/tests_app/cli/test_cloud_cli.py +++ b/tests/tests_app/cli/test_cloud_cli.py @@ -11,6 +11,7 @@ from lightning_cloud.openapi import ( V1LightningappV2, V1ListLightningappInstancesResponse, + V1ListLightningappsV2Response, V1ListMembershipsResponse, V1Membership, ) @@ -185,7 +186,7 @@ def __init__(self, *args, message, **kwargs): super().__init__() self.message = message - def lightningapp_instance_service_list_lightningapp_instances(self, *args, **kwargs): + def lightningapp_v2_service_list_lightningapps_v2(self, *args, **kwargs): raise ApiException( http_resp=HttpHeaderDict( data=self.message, diff --git a/tests/tests_app/components/test_auto_scaler.py b/tests/tests_app/components/test_auto_scaler.py deleted file mode 100644 index 672b05bbc9a151..00000000000000 --- a/tests/tests_app/components/test_auto_scaler.py +++ /dev/null @@ -1,100 +0,0 @@ -import time -from unittest.mock import patch - -import pytest - -from lightning_app import CloudCompute, LightningWork -from lightning_app.components import AutoScaler - - -class EmptyWork(LightningWork): - def run(self): - pass - - -class AutoScaler1(AutoScaler): - def scale(self, replicas: int, metrics) -> int: - # only upscale - return replicas + 1 - - -class AutoScaler2(AutoScaler): - def scale(self, replicas: int, metrics) -> int: - # only downscale - return replicas - 1 - - -def test_num_replicas_after_init(): - """Test the number of works is the same as min_replicas after initialization.""" - min_replicas = 2 - auto_scaler = AutoScaler(EmptyWork, min_replicas=min_replicas) - assert auto_scaler.num_replicas == min_replicas - - -@patch("uvicorn.run") -@patch("lightning_app.components.auto_scaler._LoadBalancer.url") -@patch("lightning_app.components.auto_scaler.AutoScaler.num_pending_requests") -def test_num_replicas_not_above_max_replicas(*_): - """Test self.num_replicas doesn't exceed max_replicas.""" - max_replicas = 6 - auto_scaler = AutoScaler1( - EmptyWork, - min_replicas=1, - max_replicas=max_replicas, - autoscale_interval=0.001, - ) - - for _ in range(max_replicas + 1): - time.sleep(0.002) - auto_scaler.run() - - assert auto_scaler.num_replicas == max_replicas - - -@patch("uvicorn.run") -@patch("lightning_app.components.auto_scaler._LoadBalancer.url") -@patch("lightning_app.components.auto_scaler.AutoScaler.num_pending_requests") -def test_num_replicas_not_belo_min_replicas(*_): - """Test self.num_replicas doesn't exceed max_replicas.""" - min_replicas = 1 - auto_scaler = AutoScaler2( - EmptyWork, - min_replicas=min_replicas, - max_replicas=4, - autoscale_interval=0.001, - ) - - for _ in range(3): - time.sleep(0.002) - auto_scaler.run() - - assert auto_scaler.num_replicas == min_replicas - - -@pytest.mark.parametrize( - "replicas, metrics, expected_replicas", - [ - pytest.param(1, {"pending_requests": 1, "pending_works": 0}, 2, id="increase if no pending work"), - pytest.param(1, {"pending_requests": 1, "pending_works": 1}, 1, id="dont increase if pending works"), - pytest.param(8, {"pending_requests": 1, "pending_works": 0}, 7, id="reduce if requests < 25% capacity"), - pytest.param(8, {"pending_requests": 2, "pending_works": 0}, 8, id="dont reduce if requests >= 25% capacity"), - ], -) -def test_scale(replicas, metrics, expected_replicas): - """Test `scale()`, the default scaling strategy.""" - auto_scaler = AutoScaler( - EmptyWork, - min_replicas=1, - max_replicas=8, - max_batch_size=1, - ) - - assert auto_scaler.scale(replicas, metrics) == expected_replicas - - -def test_create_work_cloud_compute_cloned(): - """Test CloudCompute is cloned to avoid creating multiple works in a single machine.""" - cloud_compute = CloudCompute("gpu") - auto_scaler = AutoScaler(EmptyWork, cloud_compute=cloud_compute) - _ = auto_scaler.create_work() - assert auto_scaler._work_kwargs["cloud_compute"] is not cloud_compute