Skip to content

Commit

Permalink
[App] Scale out/in interval for autoscaler (#16093)
Browse files Browse the repository at this point in the history
* Adding arguments for scale out/in interval

* Tests

(cherry picked from commit 0fd3d54)
  • Loading branch information
Sherin Thomas authored and Borda committed Dec 20, 2022
1 parent c57af59 commit 3efeaaa
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 28 deletions.
3 changes: 2 additions & 1 deletion examples/app_server_with_auto_scaler/app.py
Expand Up @@ -75,7 +75,8 @@ def scale(self, replicas: int, metrics: dict) -> int:
# autoscaler specific args
min_replicas=1,
max_replicas=4,
autoscale_interval=10,
scale_out_interval=10,
scale_in_interval=10,
endpoint="predict",
input_type=L.app.components.Image,
output_type=L.app.components.Number,
Expand Down
2 changes: 2 additions & 0 deletions src/lightning_app/CHANGELOG.md
Expand Up @@ -13,6 +13,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

- Added a nicer UI with URL and examples for the autoscaler component ([#16063](https://github.com/Lightning-AI/lightning/pull/16063))

- Enabled users to have more control over scaling out/in interval ([#16093](https://github.com/Lightning-AI/lightning/pull/16093))

- Added more datatypes to serving component ([#16018](https://github.com/Lightning-AI/lightning/pull/16018))

- Added `work.delete` method to delete the work ([#16103](https://github.com/Lightning-AI/lightning/pull/16103))
Expand Down
56 changes: 31 additions & 25 deletions src/lightning_app/components/serve/auto_scaler.py
Expand Up @@ -409,7 +409,8 @@ class AutoScaler(LightningFlow):
Args:
min_replicas: The number of works to start when app initializes.
max_replicas: The max number of works to spawn to handle the incoming requests.
autoscale_interval: The number of seconds to wait before checking whether to upscale or downscale the works.
scale_out_interval: The number of seconds to wait before checking whether to increase the number of servers.
scale_in_interval: The number of seconds to wait before checking whether to decrease the number of servers.
endpoint: Provide the REST API path.
max_batch_size: (auto-batching) The number of requests to process at once.
timeout_batching: (auto-batching) The number of seconds to wait before sending the requests to process.
Expand All @@ -426,7 +427,8 @@ class AutoScaler(LightningFlow):
MyPythonServer,
min_replicas=1,
max_replicas=8,
autoscale_interval=10,
scale_out_interval=10,
scale_in_interval=10,
)
)
Expand Down Expand Up @@ -455,7 +457,8 @@ def scale(self, replicas: int, metrics: dict) -> int:
MyPythonServer,
min_replicas=1,
max_replicas=8,
autoscale_interval=10,
scale_out_interval=10,
scale_in_interval=10,
max_batch_size=8, # for auto batching
timeout_batching=1, # for auto batching
)
Expand All @@ -467,7 +470,8 @@ def __init__(
work_cls: Type[LightningWork],
min_replicas: int = 1,
max_replicas: int = 4,
autoscale_interval: int = 10,
scale_out_interval: int = 10,
scale_in_interval: int = 10,
max_batch_size: int = 8,
timeout_batching: float = 1,
endpoint: str = "api/predict",
Expand All @@ -486,7 +490,8 @@ def __init__(

self._input_type = input_type
self._output_type = output_type
self.autoscale_interval = autoscale_interval
self.scale_out_interval = scale_out_interval
self.scale_in_interval = scale_in_interval
self.max_batch_size = max_batch_size

if max_replicas < min_replicas:
Expand Down Expand Up @@ -612,11 +617,6 @@ def num_pending_works(self) -> int:

def autoscale(self) -> None:
"""Adjust the number of works based on the target number returned by ``self.scale``."""
if time.time() - self._last_autoscale < self.autoscale_interval:
return

self.load_balancer.update_servers(self.workers)

metrics = {
"pending_requests": self.num_pending_requests,
"pending_works": self.num_pending_works,
Expand All @@ -628,23 +628,29 @@ def autoscale(self) -> None:
min(self.max_replicas, self.scale(self.num_replicas, metrics)),
)

# upscale
num_workers_to_add = num_target_workers - self.num_replicas
for _ in range(num_workers_to_add):
logger.info(f"Upscaling from {self.num_replicas} to {self.num_replicas + 1}")
work = self.create_work()
new_work_id = self.add_work(work)
logger.info(f"Work created: '{new_work_id}'")

# downscale
num_workers_to_remove = self.num_replicas - num_target_workers
for _ in range(num_workers_to_remove):
logger.info(f"Downscaling from {self.num_replicas} to {self.num_replicas - 1}")
removed_work_id = self.remove_work(self.num_replicas - 1)
logger.info(f"Work removed: '{removed_work_id}'")
# scale-out
if time.time() - self._last_autoscale > self.scale_out_interval:
num_workers_to_add = num_target_workers - self.num_replicas
for _ in range(num_workers_to_add):
logger.info(f"Scaling out from {self.num_replicas} to {self.num_replicas + 1}")
work = self.create_work()
# TODO: move works into structures
new_work_id = self.add_work(work)
logger.info(f"Work created: '{new_work_id}'")
if num_workers_to_add > 0:
self._last_autoscale = time.time()

# scale-in
if time.time() - self._last_autoscale > self.scale_in_interval:
num_workers_to_remove = self.num_replicas - num_target_workers
for _ in range(num_workers_to_remove):
logger.info(f"Scaling in from {self.num_replicas} to {self.num_replicas - 1}")
removed_work_id = self.remove_work(self.num_replicas - 1)
logger.info(f"Work removed: '{removed_work_id}'")
if num_workers_to_remove > 0:
self._last_autoscale = time.time()

self.load_balancer.update_servers(self.workers)
self._last_autoscale = time.time()

def configure_layout(self):
tabs = [
Expand Down
46 changes: 44 additions & 2 deletions tests/tests_app/components/serve/test_auto_scaler.py
Expand Up @@ -42,7 +42,8 @@ def test_num_replicas_not_above_max_replicas(*_):
EmptyWork,
min_replicas=1,
max_replicas=max_replicas,
autoscale_interval=0.001,
scale_out_interval=0.001,
scale_in_interval=0.001,
)

for _ in range(max_replicas + 1):
Expand All @@ -62,7 +63,8 @@ def test_num_replicas_not_belo_min_replicas(*_):
EmptyWork,
min_replicas=min_replicas,
max_replicas=4,
autoscale_interval=0.001,
scale_out_interval=0.001,
scale_in_interval=0.001,
)

for _ in range(3):
Expand Down Expand Up @@ -131,3 +133,43 @@ def test_API_ACCESS_ENDPOINT_creation():

auto_scaler.load_balancer.run()
fastapi_mock.mount.assert_called_once_with("/endpoint-info", mock.ANY, name="static")


def test_autoscaler_scale_up(monkeypatch):
monkeypatch.setattr(AutoScaler, "num_pending_works", 0)
monkeypatch.setattr(AutoScaler, "num_pending_requests", 100)
monkeypatch.setattr(AutoScaler, "scale", mock.MagicMock(return_value=1))
monkeypatch.setattr(AutoScaler, "create_work", mock.MagicMock())
monkeypatch.setattr(AutoScaler, "add_work", mock.MagicMock())

auto_scaler = AutoScaler(EmptyWork, min_replicas=0, max_replicas=4, scale_out_interval=0.001)

# Mocking the attributes
auto_scaler._last_autoscale = time.time() - 100000
auto_scaler.num_replicas = 0

# triggering scale up
auto_scaler.autoscale()
auto_scaler.scale.assert_called_once()
auto_scaler.create_work.assert_called_once()
auto_scaler.add_work.assert_called_once()


def test_autoscaler_scale_down(monkeypatch):
monkeypatch.setattr(AutoScaler, "num_pending_works", 0)
monkeypatch.setattr(AutoScaler, "num_pending_requests", 0)
monkeypatch.setattr(AutoScaler, "scale", mock.MagicMock(return_value=0))
monkeypatch.setattr(AutoScaler, "remove_work", mock.MagicMock())
monkeypatch.setattr(AutoScaler, "workers", mock.MagicMock())

auto_scaler = AutoScaler(EmptyWork, min_replicas=0, max_replicas=4, scale_in_interval=0.001)

# Mocking the attributes
auto_scaler._last_autoscale = time.time() - 100000
auto_scaler.num_replicas = 1
auto_scaler.__dict__["load_balancer"] = mock.MagicMock()

# triggering scale up
auto_scaler.autoscale()
auto_scaler.scale.assert_called_once()
auto_scaler.remove_work.assert_called_once()

0 comments on commit 3efeaaa

Please sign in to comment.