Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(adaptive-core): improved readability recommendations #8431

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions distributed/deploy/adaptive.py
Expand Up @@ -8,7 +8,7 @@
import dask.config
from dask.utils import parse_timedelta

from distributed.deploy.adaptive_core import AdaptiveCore
from distributed.deploy.adaptive_core import AdaptiveCore, Recommendation

Check warning on line 11 in distributed/deploy/adaptive.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive.py#L11

Added line #L11 was not covered by tests
from distributed.protocol import pickle
from distributed.utils import log_errors

Expand Down Expand Up @@ -152,7 +152,7 @@
target_duration=self.target_duration
)

async def recommendations(self, target: int) -> dict:
async def recommendations(self, target: int) -> Recommendation:

Check warning on line 155 in distributed/deploy/adaptive.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive.py#L155

Added line #L155 was not covered by tests
if len(self.plan) != len(self.requested):
# Ensure that the number of planned and requested workers
# are in sync before making recommendations.
Expand Down
44 changes: 26 additions & 18 deletions distributed/deploy/adaptive_core.py
Expand Up @@ -5,10 +5,11 @@
from collections import defaultdict, deque
from collections.abc import Iterable
from datetime import timedelta
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, Literal, TypedDict, cast

import tlz as toolz
from tornado.ioloop import IOLoop
from typing_extensions import NotRequired

Check warning on line 12 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L12

Added line #L12 was not covered by tests

import dask.config
from dask.utils import parse_timedelta
Expand All @@ -23,6 +24,15 @@
logger = logging.getLogger(__name__)


RecommendationStatus = Literal["up", "down", "same"]

Check warning on line 27 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L27

Added line #L27 was not covered by tests


class Recommendation(TypedDict):
status: RecommendationStatus
workers: NotRequired[set[WorkerState]]
n: NotRequired[int]

Check warning on line 33 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L30-L33

Added lines #L30 - L33 were not covered by tests


class AdaptiveCore:
"""
The core logic for adaptive deployments, with none of the cluster details
Expand Down Expand Up @@ -169,13 +179,13 @@

return n

async def scale_down(self, n: int) -> None:
async def scale_down(self, workers: Iterable) -> None:

Check warning on line 182 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L182

Added line #L182 was not covered by tests
raise NotImplementedError()

async def scale_up(self, workers: Iterable) -> None:
async def scale_up(self, n: int) -> None:

Check warning on line 185 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L185

Added line #L185 was not covered by tests
raise NotImplementedError()

async def recommendations(self, target: int) -> dict:
async def recommendations(self, target: int) -> Recommendation:

Check warning on line 188 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L188

Added line #L188 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a breaking change since it is possible to subclass Adaptive and all consumers of recommendations would break.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good point. I rewrote it into a TypedDict, so that it should remain backwards compatible while still improving readability.

"""
Make scale up/down recommendations based on current state and target
"""
Expand All @@ -185,11 +195,11 @@

if target == len(plan):
self.close_counts.clear()
return {"status": "same"}
return Recommendation(status="same")

Check warning on line 198 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L198

Added line #L198 was not covered by tests

if target > len(plan):
self.close_counts.clear()
return {"status": "up", "n": target}
return Recommendation(status="up", n=target)

Check warning on line 202 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L202

Added line #L202 was not covered by tests

# target < len(plan)
not_yet_arrived = requested - observed
Expand All @@ -212,9 +222,9 @@
del self.close_counts[k]

if firmly_close:
return {"status": "down", "workers": list(firmly_close)}
return Recommendation(status="down", workers=firmly_close)

Check warning on line 225 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L225

Added line #L225 was not covered by tests
else:
return {"status": "same"}
return Recommendation(status="same")

Check warning on line 227 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L227

Added line #L227 was not covered by tests

async def adapt(self) -> None:
"""
Expand All @@ -229,18 +239,16 @@

try:
target = await self.safe_target()
recommendations = await self.recommendations(target)

if recommendations["status"] != "same":
self.log.append((time(), dict(recommendations)))
recommendation = await self.recommendations(target)

Check warning on line 242 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L242

Added line #L242 was not covered by tests

status = recommendations.pop("status")
if status == "same":
if recommendation["status"] == "same":

Check warning on line 244 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L244

Added line #L244 was not covered by tests
return
if status == "up":
await self.scale_up(**recommendations)
if status == "down":
await self.scale_down(**recommendations)
else:
self.log.append((time(), cast(dict, recommendation)))
if recommendation["status"] == "up":
await self.scale_up(recommendation["n"])
elif recommendation["status"] == "down":
await self.scale_down(recommendation["workers"])

Check warning on line 251 in distributed/deploy/adaptive_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/deploy/adaptive_core.py#L247-L251

Added lines #L247 - L251 were not covered by tests
except OSError:
if status != "down":
logger.error("Adaptive stopping due to error", exc_info=True)
Expand Down