Skip to content

Commit

Permalink
[hotfix][autoscaler] Request resources refactor2 (#12661)
Browse files Browse the repository at this point in the history
* prepare for head node

* move command runner interface outside _private

* remove space

* Eric

* flake

* min_workers in multi node type

* fixing edge cases

* eric not idle

* fix target_workers to consider min_workers of node types

* idle timeout

* minor

* minor fix

* test

* lint

* eric v2

* eric 3

* min_workers constraint before bin packing

* Update resource_demand_scheduler.py

* Revert "Update resource_demand_scheduler.py"

This reverts commit 818a63a.

* reducing diff

* make get_nodes_to_launch return a dict

* merge

* weird merge fix

* auto fill instance types for AWS

* Alex/Eric

* Update doc/source/cluster/autoscaling.rst

* merge autofill and input from user

* logger.exception

* make the yaml use the default autofill

* docs Eric

* remove test_autoscaler_yaml from windows tests

* lets try changing the test a bit

* return test

* lets see

* edward

* Limit max launch concurrency

* commenting frac TODO

* move to resource demand scheduler

* use STATUS UP TO DATE

* Eric

* make logger of gc freed refs debug instead of info

* add cluster name to docker mount prefix directory

* grrR

* fix tests

* moving docker directory to sdk

* move the import to prevent circular dependency

* smallf fix

* ian

* fix max launch concurrency bug to assume failing nodes as pending and consider only load_metric's connected nodes as running

* small fix

* request_resources -> min workers

* test fixes

* add race condition tests

* Eric

* fixes

* semi final

* semi final

* lint

* lint

Co-authored-by: Ameer Haj Ali <ameerhajali@ameers-mbp.lan>
Co-authored-by: Alex Wu <alex@anyscale.io>
Co-authored-by: Alex Wu <itswu.alex@gmail.com>
Co-authored-by: Eric Liang <ekhliang@gmail.com>
Co-authored-by: Ameer Haj Ali <ameerhajali@Ameers-MacBook-Pro.local>
  • Loading branch information
6 people committed Dec 9, 2020
1 parent 343b479 commit a4dbb27
Show file tree
Hide file tree
Showing 4 changed files with 596 additions and 92 deletions.
146 changes: 121 additions & 25 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@

from ray.experimental.internal_kv import _internal_kv_put, \
_internal_kv_initialized
from ray.autoscaler.tags import (TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_FILE_MOUNTS_CONTENTS,
TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND,
TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE,
NODE_KIND_WORKER, NODE_KIND_UNMANAGED)
from ray.autoscaler.tags import (
TAG_RAY_LAUNCH_CONFIG, TAG_RAY_RUNTIME_CONFIG,
TAG_RAY_FILE_MOUNTS_CONTENTS, TAG_RAY_NODE_STATUS, TAG_RAY_NODE_KIND,
TAG_RAY_USER_NODE_TYPE, STATUS_UP_TO_DATE, NODE_KIND_WORKER,
NODE_KIND_UNMANAGED, NODE_KIND_HEAD)
from ray.autoscaler._private.providers import _get_node_provider
from ray.autoscaler._private.updater import NodeUpdaterThread
from ray.autoscaler._private.node_launcher import NodeLauncher
from ray.autoscaler._private.resource_demand_scheduler import \
ResourceDemandScheduler, NodeType, NodeID
get_bin_pack_residual, ResourceDemandScheduler, NodeType, NodeID, NodeIP, \
ResourceDict
from ray.autoscaler._private.util import ConcurrentCounter, validate_config, \
with_head_node_ip, hash_launch_conf, hash_runtime_conf, \
DEBUG_AUTOSCALING_STATUS, DEBUG_AUTOSCALING_ERROR
Expand Down Expand Up @@ -164,27 +165,35 @@ def _update(self):
last_used = self.load_metrics.last_used_time_by_ip
horizon = now - (60 * self.config["idle_timeout_minutes"])

nodes_to_terminate = []
nodes_to_terminate: Dict[NodeID, bool] = []
node_type_counts = collections.defaultdict(int)
# Sort based on last used to make sure to keep min_workers that
# were most recently used. Otherwise, _keep_min_workers_of_node_type
# might keep a node that should be terminated.
for node_id in self._sort_based_on_last_used(nodes, last_used):
sorted_node_ids = self._sort_based_on_last_used(nodes, last_used)
# Don't terminate nodes needed by request_resources()
nodes_allowed_to_terminate: Dict[NodeID, bool] = {}
if self.resource_demand_vector:
nodes_allowed_to_terminate = self._get_nodes_allowed_to_terminate(
sorted_node_ids)

for node_id in sorted_node_ids:
# Make sure to not kill idle node types if the number of workers
# of that type is lower/equal to the min_workers of that type.
if self._keep_min_worker_of_node_type(
node_id,
node_type_counts) and self.launch_config_ok(node_id):
# of that type is lower/equal to the min_workers of that type
# or it is needed for request_resources().
if (self._keep_min_worker_of_node_type(node_id, node_type_counts)
or not nodes_allowed_to_terminate.get(
node_id, True)) and self.launch_config_ok(node_id):
continue

node_ip = self.provider.internal_ip(node_id)
if node_ip in last_used and last_used[node_ip] < horizon:
logger.info("StandardAutoscaler: "
"{}: Terminating idle node".format(node_id))
"{}: Terminating idle node.".format(node_id))
nodes_to_terminate.append(node_id)
elif not self.launch_config_ok(node_id):
logger.info("StandardAutoscaler: "
"{}: Terminating outdated node".format(node_id))
"{}: Terminating outdated node.".format(node_id))
nodes_to_terminate.append(node_id)

if nodes_to_terminate:
Expand All @@ -198,7 +207,7 @@ def _update(self):
len(nodes_to_terminate)) > self.config["max_workers"] and nodes:
to_terminate = nodes.pop()
logger.info("StandardAutoscaler: "
"{}: Terminating unneeded node".format(to_terminate))
"{}: Terminating unneeded node.".format(to_terminate))
nodes_to_terminate.append(to_terminate)

if nodes_to_terminate:
Expand Down Expand Up @@ -226,15 +235,23 @@ def _update(self):
if not updater.is_alive():
completed.append(node_id)
if completed:
nodes_to_terminate: List[NodeID] = []
for node_id in completed:
if self.updaters[node_id].exitcode == 0:
self.num_successful_updates[node_id] += 1
# Mark the node as active to prevent the node recovery
# logic immediately trying to restart Ray on the new node.
self.load_metrics.mark_active(
self.provider.internal_ip(node_id))
else:
logger.error(f"StandardAutoscaler: {node_id}: Terminating "
"failed to setup/initialize node.")
nodes_to_terminate.append(node_id)
self.num_failed_updates[node_id] += 1
del self.updaters[node_id]
# Mark the node as active to prevent the node recovery logic
# immediately trying to restart Ray on the new node.
self.load_metrics.mark_active(self.provider.internal_ip(node_id))
if nodes_to_terminate:
self.provider.terminate_nodes(nodes_to_terminate)

nodes = self.workers()
self.log_info_string(nodes)

Expand Down Expand Up @@ -266,24 +283,103 @@ def _sort_based_on_last_used(self, nodes: List[NodeID],
last_used: Dict[str, float]) -> List[NodeID]:
"""Sort the nodes based on the last time they were used.
The first item in the return list is the least recently used.
The first item in the return list is the most recently used.
"""
updated_last_used = copy.deepcopy(last_used)
now = time.time()
# Add the unconnected nodes as the least recently used (the end of
# list). This prioritizes connected nodes.
least_recently_used = -1
for node_id in nodes:
node_ip = self.provider.internal_ip(node_id)
if node_ip not in updated_last_used:
updated_last_used[node_ip] = now
updated_last_used[node_ip] = least_recently_used

def last_time_used(node_id: NodeID):
node_ip = self.provider.internal_ip(node_id)
return updated_last_used[node_ip]

return sorted(nodes, key=last_time_used, reverse=True)

def _keep_min_worker_of_node_type(self, node_id: NodeID,
node_type_counts: Dict[NodeType, int]):
"""Returns if workers of node_type should be terminated.
def _get_nodes_allowed_to_terminate(
self, sorted_node_ids: List[NodeID]) -> Dict[NodeID, bool]:
# TODO(ameer): try merging this with resource_demand_scheduler
# code responsible for adding nodes for request_resources().
"""Returns the nodes allowed to terminate for request_resources().
Args:
sorted_node_ids: the node ids sorted based on last used (LRU last).
Returns:
nodes_allowed_to_terminate: whether the node id is allowed to
terminate or not.
"""
nodes_allowed_to_terminate: Dict[NodeID, bool] = {}
head_node_resources: ResourceDict = copy.deepcopy(
self.available_node_types[self.config["head_node_type"]][
"resources"])
if not head_node_resources:
# Legacy yaml might include {} in the resources field.
# TODO(ameer): this is somewhat duplicated in
# resource_demand_scheduler.py.
head_id: List[NodeID] = self.provider.non_terminated_nodes({
TAG_RAY_NODE_KIND: NODE_KIND_HEAD
})
if head_id:
head_ip = self.provider.internal_ip(head_id[0])
static_nodes: Dict[
NodeIP,
ResourceDict] = \
self.load_metrics.get_static_node_resources_by_ip()
head_node_resources = static_nodes[head_ip]
else:
head_node_resources = {}

max_node_resources: List[ResourceDict] = [head_node_resources]
resource_demand_vector_worker_node_ids = []
# Get max resources on all the non terminated nodes.
for node_id in sorted_node_ids:
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]
node_resources: ResourceDict = copy.deepcopy(
self.available_node_types[node_type]["resources"])
if not node_resources:
# Legacy yaml might include {} in the resources field.
static_nodes: Dict[
NodeIP,
ResourceDict] = \
self.load_metrics.get_static_node_resources_by_ip()
node_ip = self.provider.internal_ip(node_id)
node_resources = static_nodes.get(node_ip, {})
max_node_resources.append(node_resources)
resource_demand_vector_worker_node_ids.append(node_id)
# Since it is sorted based on last used, we "keep" nodes that are
# most recently used when we binpack. We assume get_bin_pack_residual
# is following the given order here.
used_resource_requests: List[ResourceDict]
_, used_resource_requests = \
get_bin_pack_residual(max_node_resources,
self.resource_demand_vector)
# Remove the first entry (the head node).
max_node_resources.pop(0)
# Remove the first entry (the head node).
used_resource_requests.pop(0)
for i, node_id in enumerate(resource_demand_vector_worker_node_ids):
if used_resource_requests[i] == max_node_resources[i] \
and max_node_resources[i]:
# No resources of the node were needed for request_resources().
# max_node_resources[i] is an empty dict for legacy yamls
# before the node is connected.
nodes_allowed_to_terminate[node_id] = True
else:
nodes_allowed_to_terminate[node_id] = False
return nodes_allowed_to_terminate

def _keep_min_worker_of_node_type(
self, node_id: NodeID,
node_type_counts: Dict[NodeType, int]) -> bool:
"""Returns if workers of node_type can be terminated.
The worker cannot be terminated to respect min_workers constraint.
Receives the counters of running nodes so far and determines if idle
node_id should be terminated or not. It also updates the counters
Expand All @@ -293,7 +389,7 @@ def _keep_min_worker_of_node_type(self, node_id: NodeID,
node_type_counts(Dict[NodeType, int]): The non_terminated node
types counted so far.
Returns:
bool: if workers of node_types should be terminated or not.
bool: if workers of node_types can be terminated or not.
"""
tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in tags:
Expand Down

0 comments on commit a4dbb27

Please sign in to comment.