From 0fd3d542058d3d0b228f3d7eafafae568dbd0c64 Mon Sep 17 00:00:00 2001 From: Sherin Thomas Date: Mon, 19 Dec 2022 13:49:00 +0000 Subject: [PATCH] [App] Scale out/in interval for autoscaler (#16093) * Adding arguments for scale out/in interval * Tests --- examples/app_server_with_auto_scaler/app.py | 3 +- src/lightning_app/CHANGELOG.md | 2 + .../components/serve/auto_scaler.py | 56 ++++++++++--------- .../components/serve/test_auto_scaler.py | 46 ++++++++++++++- 4 files changed, 79 insertions(+), 28 deletions(-) diff --git a/examples/app_server_with_auto_scaler/app.py b/examples/app_server_with_auto_scaler/app.py index 453db2424b404..2c8fb744c4fcf 100644 --- a/examples/app_server_with_auto_scaler/app.py +++ b/examples/app_server_with_auto_scaler/app.py @@ -75,7 +75,8 @@ def scale(self, replicas: int, metrics: dict) -> int: # autoscaler specific args min_replicas=1, max_replicas=4, - autoscale_interval=10, + scale_out_interval=10, + scale_in_interval=10, endpoint="predict", input_type=L.app.components.Image, output_type=L.app.components.Number, diff --git a/src/lightning_app/CHANGELOG.md b/src/lightning_app/CHANGELOG.md index eb0262ade8132..4ee679fcd6493 100644 --- a/src/lightning_app/CHANGELOG.md +++ b/src/lightning_app/CHANGELOG.md @@ -16,6 +16,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Added a nicer UI with URL and examples for the autoscaler component ([#16063](https://github.com/Lightning-AI/lightning/pull/16063)) +- Enabled users to have more control over scaling out/in interval ([#16093](https://github.com/Lightning-AI/lightning/pull/16093)) + - Added more datatypes to serving component ([#16018](https://github.com/Lightning-AI/lightning/pull/16018)) - Added `work.delete` method to delete the work ([#16103](https://github.com/Lightning-AI/lightning/pull/16103)) diff --git a/src/lightning_app/components/serve/auto_scaler.py b/src/lightning_app/components/serve/auto_scaler.py index 6b25c5fce3860..2493f63048e60 100644 --- a/src/lightning_app/components/serve/auto_scaler.py +++ b/src/lightning_app/components/serve/auto_scaler.py @@ -409,7 +409,8 @@ class AutoScaler(LightningFlow): Args: min_replicas: The number of works to start when app initializes. max_replicas: The max number of works to spawn to handle the incoming requests. - autoscale_interval: The number of seconds to wait before checking whether to upscale or downscale the works. + scale_out_interval: The number of seconds to wait before checking whether to increase the number of servers. + scale_in_interval: The number of seconds to wait before checking whether to decrease the number of servers. endpoint: Provide the REST API path. max_batch_size: (auto-batching) The number of requests to process at once. timeout_batching: (auto-batching) The number of seconds to wait before sending the requests to process. @@ -426,7 +427,8 @@ class AutoScaler(LightningFlow): MyPythonServer, min_replicas=1, max_replicas=8, - autoscale_interval=10, + scale_out_interval=10, + scale_in_interval=10, ) ) @@ -455,7 +457,8 @@ def scale(self, replicas: int, metrics: dict) -> int: MyPythonServer, min_replicas=1, max_replicas=8, - autoscale_interval=10, + scale_out_interval=10, + scale_in_interval=10, max_batch_size=8, # for auto batching timeout_batching=1, # for auto batching ) @@ -467,7 +470,8 @@ def __init__( work_cls: Type[LightningWork], min_replicas: int = 1, max_replicas: int = 4, - autoscale_interval: int = 10, + scale_out_interval: int = 10, + scale_in_interval: int = 10, max_batch_size: int = 8, timeout_batching: float = 1, endpoint: str = "api/predict", @@ -486,7 +490,8 @@ def __init__( self._input_type = input_type self._output_type = output_type - self.autoscale_interval = autoscale_interval + self.scale_out_interval = scale_out_interval + self.scale_in_interval = scale_in_interval self.max_batch_size = max_batch_size if max_replicas < min_replicas: @@ -612,11 +617,6 @@ def num_pending_works(self) -> int: def autoscale(self) -> None: """Adjust the number of works based on the target number returned by ``self.scale``.""" - if time.time() - self._last_autoscale < self.autoscale_interval: - return - - self.load_balancer.update_servers(self.workers) - metrics = { "pending_requests": self.num_pending_requests, "pending_works": self.num_pending_works, @@ -628,23 +628,29 @@ def autoscale(self) -> None: min(self.max_replicas, self.scale(self.num_replicas, metrics)), ) - # upscale - num_workers_to_add = num_target_workers - self.num_replicas - for _ in range(num_workers_to_add): - logger.info(f"Upscaling from {self.num_replicas} to {self.num_replicas + 1}") - work = self.create_work() - new_work_id = self.add_work(work) - logger.info(f"Work created: '{new_work_id}'") - - # downscale - num_workers_to_remove = self.num_replicas - num_target_workers - for _ in range(num_workers_to_remove): - logger.info(f"Downscaling from {self.num_replicas} to {self.num_replicas - 1}") - removed_work_id = self.remove_work(self.num_replicas - 1) - logger.info(f"Work removed: '{removed_work_id}'") + # scale-out + if time.time() - self._last_autoscale > self.scale_out_interval: + num_workers_to_add = num_target_workers - self.num_replicas + for _ in range(num_workers_to_add): + logger.info(f"Scaling out from {self.num_replicas} to {self.num_replicas + 1}") + work = self.create_work() + # TODO: move works into structures + new_work_id = self.add_work(work) + logger.info(f"Work created: '{new_work_id}'") + if num_workers_to_add > 0: + self._last_autoscale = time.time() + + # scale-in + if time.time() - self._last_autoscale > self.scale_in_interval: + num_workers_to_remove = self.num_replicas - num_target_workers + for _ in range(num_workers_to_remove): + logger.info(f"Scaling in from {self.num_replicas} to {self.num_replicas - 1}") + removed_work_id = self.remove_work(self.num_replicas - 1) + logger.info(f"Work removed: '{removed_work_id}'") + if num_workers_to_remove > 0: + self._last_autoscale = time.time() self.load_balancer.update_servers(self.workers) - self._last_autoscale = time.time() def configure_layout(self): tabs = [ diff --git a/tests/tests_app/components/serve/test_auto_scaler.py b/tests/tests_app/components/serve/test_auto_scaler.py index 52ffa02f162c6..c3cfa99c9d69b 100644 --- a/tests/tests_app/components/serve/test_auto_scaler.py +++ b/tests/tests_app/components/serve/test_auto_scaler.py @@ -42,7 +42,8 @@ def test_num_replicas_not_above_max_replicas(*_): EmptyWork, min_replicas=1, max_replicas=max_replicas, - autoscale_interval=0.001, + scale_out_interval=0.001, + scale_in_interval=0.001, ) for _ in range(max_replicas + 1): @@ -62,7 +63,8 @@ def test_num_replicas_not_belo_min_replicas(*_): EmptyWork, min_replicas=min_replicas, max_replicas=4, - autoscale_interval=0.001, + scale_out_interval=0.001, + scale_in_interval=0.001, ) for _ in range(3): @@ -131,3 +133,43 @@ def test_API_ACCESS_ENDPOINT_creation(): auto_scaler.load_balancer.run() fastapi_mock.mount.assert_called_once_with("/endpoint-info", mock.ANY, name="static") + + +def test_autoscaler_scale_up(monkeypatch): + monkeypatch.setattr(AutoScaler, "num_pending_works", 0) + monkeypatch.setattr(AutoScaler, "num_pending_requests", 100) + monkeypatch.setattr(AutoScaler, "scale", mock.MagicMock(return_value=1)) + monkeypatch.setattr(AutoScaler, "create_work", mock.MagicMock()) + monkeypatch.setattr(AutoScaler, "add_work", mock.MagicMock()) + + auto_scaler = AutoScaler(EmptyWork, min_replicas=0, max_replicas=4, scale_out_interval=0.001) + + # Mocking the attributes + auto_scaler._last_autoscale = time.time() - 100000 + auto_scaler.num_replicas = 0 + + # triggering scale up + auto_scaler.autoscale() + auto_scaler.scale.assert_called_once() + auto_scaler.create_work.assert_called_once() + auto_scaler.add_work.assert_called_once() + + +def test_autoscaler_scale_down(monkeypatch): + monkeypatch.setattr(AutoScaler, "num_pending_works", 0) + monkeypatch.setattr(AutoScaler, "num_pending_requests", 0) + monkeypatch.setattr(AutoScaler, "scale", mock.MagicMock(return_value=0)) + monkeypatch.setattr(AutoScaler, "remove_work", mock.MagicMock()) + monkeypatch.setattr(AutoScaler, "workers", mock.MagicMock()) + + auto_scaler = AutoScaler(EmptyWork, min_replicas=0, max_replicas=4, scale_in_interval=0.001) + + # Mocking the attributes + auto_scaler._last_autoscale = time.time() - 100000 + auto_scaler.num_replicas = 1 + auto_scaler.__dict__["load_balancer"] = mock.MagicMock() + + # triggering scale up + auto_scaler.autoscale() + auto_scaler.scale.assert_called_once() + auto_scaler.remove_work.assert_called_once()