Skip to content

Commit

Permalink
Support resurrecting blacklisted hosts
Browse files Browse the repository at this point in the history
This adds support for resurrecting blacklisted hosts in elastic mode.
Currently hosts that get blacklisted remain in the blacklist for the lifetime of the job.
This cannot handle transient host failure or a scale-up after as scale-down.
This is especially the case for the Kubeflow mpi-operator on Kubernetes, as it always
gives pods known hostnames from its hostfile.

This patch will allow blacklisted hosts to become whitelisted after a countdown period.
For repeat failures the cooldown period grows with an exponential backoff delay: 10s, 20s, 30s. Cooldown period is capped
at 5 minutes.

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
  • Loading branch information
ashahab committed Dec 13, 2021
1 parent df18797 commit ae0ad53
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -12,6 +12,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

- TensorFlow: Added in-place broadcasting of variables. ([#3128](https://github.com/horovod/horovod/pull/3128))

- Added support for resurrecting blacklisted hosts. ([#1926](https://github.com/horovod/horovod/issues/1926))

### Changed

### Deprecated
Expand Down
8 changes: 6 additions & 2 deletions docs/elastic.rst
Expand Up @@ -332,8 +332,12 @@ The maximum np can be used to cap the number of processes (to prevent over-utili
as a reference point for learning rate scales and data partitions (in cases where these need to be held constant
regardless of the current number of workers). If unspecified, maximum np also defaults to ``-np``.

Instances that fail will be added to a blacklist, as they may have faulty hardware. Ranks that fail repeatedly
will result in job failure, as it may be the case that the training process cannot make progress.
Instances that fail will be added to a blacklist, as they may have faulty hardware. Hosts will remain in blacklist for a cooldown period.
After the cooldown period ends, the hosts will be whitelisted back. This is to account for transient failures, and cases where the same host
is added back to a job. Currently the blacklist period is configured to be 10 seconds.
For repeat failures the cooldown period grows with an exponential backoff delay: 10s, 20s, 30s. Cooldown period is capped
at 5 minutes.
Ranks that fail repeatedly will result in job failure, as it may be the case that the training process cannot make progress.


Running on Ray
Expand Down
75 changes: 66 additions & 9 deletions horovod/runner/elastic/discovery.py
Expand Up @@ -15,20 +15,27 @@

import io
import logging
import random
import threading

import time
from collections import defaultdict

from horovod.runner.common.util import safe_shell_exec
from horovod.runner.elastic.worker import HostUpdateResult


# Maximum time hosts are in cooldown period
COOLDOWN_UPPER_LIMIT_SECONDS = 5 * 60
# Minimum time hosts are in cooldown
COOLDOWN_LOWER_LIMIT_SECONDS = 10
# Unit of increasing cooldown for repeat failure
COOLDOWN_PERIOD_DELAY_SECONDS = 10
class HostState(object):

def __init__(self):
self._event = threading.Event()

# TODO(travis): blacklisted hosts should have a timeout period that increases with each failure
self._blacklisted = False
self._blacklist_count = 0
self._cooldown_period_end_ts = 0

def get_event(self):
if self._event.is_set():
Expand All @@ -39,13 +46,46 @@ def get_event(self):
def set_event(self):
self._event.set()

def _in_cooldown_period(self, current_time):
return self._cooldown_period_end_ts > current_time


def _set_cooldown_period(self, current_time):
self._blacklist_count += 1
def _exponential_backoff_time():
cooldown_delay = COOLDOWN_PERIOD_DELAY_SECONDS * (1 << self._blacklist_count)
+ (random.uniform(0,1) * COOLDOWN_PERIOD_DELAY_SECONDS)
logging.debug(f"{self._blacklist_count}:{self._cooldown_period_end_ts} cooldown_delay: {cooldown_delay}")
return max(COOLDOWN_LOWER_LIMIT_SECONDS, min(COOLDOWN_UPPER_LIMIT_SECONDS, cooldown_delay))
cooldown_delta_seconds = _exponential_backoff_time()
self._cooldown_period_end_ts = current_time + cooldown_delta_seconds
logging.debug(f"cooldown delta seconds: {cooldown_delta_seconds}")

def blacklist(self):
"""Moves this host to a blacklist, and starts the cooldown period."""
self._blacklisted = True
now = time.time()
if self._in_cooldown_period(now):
return
self._set_cooldown_period(now)
self.set_event()

def whitelist(self):
"""Ends the cooldown period and moves this host out of blacklist."""
self._cooldown_period_end_ts = 0
self._blacklisted = False

def is_blacklisted(self):
"""Checks if the host is in the blacklist."""
return self._blacklisted

def is_resurrected(self):
"""Checks if host is in an expired cooldown period."""
if self._cooldown_period_end_ts > 0:
return not self._in_cooldown_period(time.time())
return False



class DiscoveredHosts(object):
def __init__(self, host_slots, host_assignment_order):
Expand Down Expand Up @@ -76,6 +116,9 @@ def update(self, hosts_state):
if not hosts_state[host].is_blacklisted()]
return self

def __str__(self):
return f"slots: {self._host_slots} order: {self._host_assignment_order}"


class HostManager(object):
def __init__(self, discovery):
Expand All @@ -84,7 +127,6 @@ def __init__(self, discovery):
self._discovery = discovery

def update_available_hosts(self):
# TODO(travis): also check for hosts removed from the blacklist in the future
def check_update(cur_host_slots, prev_host_slots):
res = HostUpdateResult.no_update

Expand All @@ -103,17 +145,32 @@ def check_update(cur_host_slots, prev_host_slots):
elif cur_host_slots[h] < prev_host_slots[h]:
# h has removed some slots
res |= HostUpdateResult.removed
elif self._hosts_state[h].is_resurrected():
res |= HostUpdateResult.added
return res

prev_host_slots = self._current_hosts.host_slots
prev_host_assignment_order = self._current_hosts.host_assignment_order
host_slots = self._discovery.find_available_hosts_and_slots()
if prev_host_slots != host_slots:
available_hosts = set([host for host in host_slots.keys() if not self._hosts_state[host].is_blacklisted()])

def whitelist_all_hosts():
for host in host_slots.keys():
if self._hosts_state[host].is_resurrected():
self._hosts_state[host].whitelist()

def has_resurrected_hosts():
resurrected_hosts = [host for host in host_slots.keys() if self._hosts_state[host].is_resurrected()]
return len(resurrected_hosts) > 0

if prev_host_slots != host_slots or has_resurrected_hosts():
available_hosts = set([host for host in host_slots.keys() \
if not (self._hosts_state[host].is_blacklisted() and not self._hosts_state[host].is_resurrected())])
host_assignment_order = HostManager.order_available_hosts(available_hosts, prev_host_assignment_order)
self._current_hosts = DiscoveredHosts(host_slots=host_slots,
host_assignment_order=host_assignment_order)
return check_update(self._current_hosts.host_slots, prev_host_slots)
host_update_state = check_update(self._current_hosts.host_slots, prev_host_slots)
whitelist_all_hosts()
return host_update_state
else:
return HostUpdateResult.no_update

Expand All @@ -123,7 +180,7 @@ def current_hosts(self):

def blacklist(self, host):
if not self._hosts_state[host].is_blacklisted():
logging.warning('blacklist failing host: {}'.format(host))
logging.debug('blacklist failing host: {}'.format(host))
self._hosts_state[host].blacklist()

def is_blacklisted(self, host):
Expand Down
125 changes: 124 additions & 1 deletion test/integration/elastic_common.py
Expand Up @@ -73,7 +73,7 @@ def __init__(self, training_script, *args, **kwargs):
super(BaseElasticTests, self).__init__(*args, **kwargs)

def _run(self, discovery_schedule=None, exit_schedule=None, exit_mode='exception',
np=2, min_np=2, max_np=4, hosts=None, reset_limit=None):
np=2, min_np=2, max_np=4, hosts=None, reset_limit=None, epoch_wait=None, epochs=None):
if not discovery_schedule and not hosts:
raise ValueError('at least one of discovery schedule or hosts must be given')

Expand All @@ -96,6 +96,10 @@ def _run(self, discovery_schedule=None, exit_schedule=None, exit_mode='exception
command_args += ['python', self._training_script, '--logfile', logfile]
if discovery_schedule:
command_args += ['--discovery-schedule', json.dumps(discovery_schedule)]
if epoch_wait:
command_args += ['--epoch-wait', json.dumps(epoch_wait)]
if epochs:
command_args += ['--epochs', json.dumps(epochs)]
if exit_schedule:
command_args += ['--exit-schedule', json.dumps(exit_schedule),
'--exit-mode', exit_mode]
Expand Down Expand Up @@ -261,3 +265,122 @@ def test_reset_limit(self, mock_get_min_start_hosts):
# Job should succeed with reset_limit=2
results = self._run(discovery_schedule, np=2, min_np=2, max_np=4, reset_limit=2)
self.assertEqual(len(results), 3)

@mock.patch('horovod.runner.elastic.discovery.COOLDOWN_PERIOD_DELAY_SECONDS', 1)
@mock.patch('horovod.runner.elastic.discovery.COOLDOWN_LOWER_LIMIT_SECONDS', 1)
@mock.patch('horovod.runner.elastic.discovery.COOLDOWN_UPPER_LIMIT_SECONDS', 1)
@mock.patch('horovod.runner.elastic.driver.DISCOVER_HOSTS_FREQUENCY_SECS', 0.01)
@mock.patch('horovod.runner.gloo_run._get_min_start_hosts', return_value=1)
def test_resurrecting_blacklisted_hosts(self, mock_get_min_start_hosts):
"""Put a host on the blacklist and then resurrect"""
for exit_mode in ['exception', 'kill']:
discovery_schedule = [
(0, ['localhost:2', '127.0.0.1:2']),
(1, ['localhost:2', '127.0.0.1:2']),
(None, ['localhost:2', '127.0.0.1:2']),
]
exit_schedule = {
str((1, 0)): [1],
}

results = self._run(discovery_schedule=discovery_schedule,
exit_schedule=exit_schedule, exit_mode=exit_mode, epoch_wait=5)

self.assertEqual(len(results), 3)
self.assertEqual(results[0]['start_rank'], 0)
self.assertEqual(results[0]['size'], 4)
self.assertEqual(results[0]['rendezvous'], 1)

self.assertEqual(results[1]['start_rank'], 2)
self.assertEqual(results[1]['size'], 2)
self.assertEqual(results[1]['rendezvous'], 2)

self.assertEqual(results[2]['start_rank'], 2)
self.assertEqual(results[2]['size'], 4)
self.assertEqual(results[2]['rendezvous'], 3)


@mock.patch('horovod.runner.elastic.discovery.COOLDOWN_PERIOD_DELAY_SECONDS', 2)
@mock.patch('horovod.runner.elastic.discovery.COOLDOWN_LOWER_LIMIT_SECONDS', 2)
@mock.patch('horovod.runner.elastic.driver.DISCOVER_HOSTS_FREQUENCY_SECS', 0.01)
@mock.patch('horovod.runner.gloo_run._get_min_start_hosts', return_value=1)
def test_resurrecting_blacklisted_hosts_exponential_backoff(self, mock_get_min_start_hosts):
"""Ensure that delay times are longer for multiple failures"""
for exit_mode in ['exception', 'kill']:
num_epochs = 5
discovery_schedule = [
(0, ['localhost:2', '127.0.0.1:2']),
(2, ['localhost:2', '127.0.0.1:2']),
(None, ['localhost:2', '127.0.0.1:2']),
]
exit_schedule = {
str((1, 0)): [3],
str((3, 0)): [3],
}

results = self._run(discovery_schedule=discovery_schedule,
exit_schedule=exit_schedule, exit_mode=exit_mode, epoch_wait=5, epochs=num_epochs)

self.assertEqual(len(results), num_epochs)
self.assertEqual(results[0]['start_rank'], 0)
self.assertEqual(results[0]['size'], 4)
self.assertEqual(results[0]['rendezvous'], 1)

self.assertEqual(results[1]['start_rank'], 0)
self.assertEqual(results[1]['size'], 2)
self.assertEqual(results[1]['rendezvous'], 2)

self.assertEqual(results[2]['start_rank'], 0)
self.assertEqual(results[2]['size'], 4)
self.assertEqual(results[2]['rendezvous'], 3)

self.assertEqual(results[3]['start_rank'], 0)
self.assertEqual(results[3]['size'], 2)
self.assertEqual(results[3]['rendezvous'], 4)

self.assertEqual(results[4]['start_rank'], 0)
self.assertEqual(results[4]['size'], 2)
self.assertEqual(results[4]['rendezvous'], 4)

@mock.patch('horovod.runner.elastic.discovery.COOLDOWN_PERIOD_DELAY_SECONDS', 2)
@mock.patch('horovod.runner.elastic.discovery.COOLDOWN_LOWER_LIMIT_SECONDS', 2)
@mock.patch('horovod.runner.elastic.discovery.COOLDOWN_UPPER_LIMIT_SECONDS', 2)
@mock.patch('horovod.runner.elastic.driver.DISCOVER_HOSTS_FREQUENCY_SECS', 0.01)
@mock.patch('horovod.runner.gloo_run._get_min_start_hosts', return_value=1)
def test_resurrecting_blacklisted_hosts_upper_limit(self, mock_get_min_start_hosts):
"""Ensure that delay times are not so long that they go beyond upper limit"""
for exit_mode in ['exception', 'kill']:
num_epochs = 5
discovery_schedule = [
(0, ['localhost:2', '127.0.0.1:2']),
(2, ['localhost:2', '127.0.0.1:2']),
(None, ['localhost:2', '127.0.0.1:2']),
]
exit_schedule = {
str((1, 0)): [3],
str((3, 0)): [3],
}

results = self._run(discovery_schedule=discovery_schedule,
exit_schedule=exit_schedule, exit_mode=exit_mode, epoch_wait=5, epochs=num_epochs)

self.assertEqual(len(results), num_epochs)
self.assertEqual(results[0]['start_rank'], 0)
self.assertEqual(results[0]['size'], 4)
self.assertEqual(results[0]['rendezvous'], 1)

self.assertEqual(results[1]['start_rank'], 0)
self.assertEqual(results[1]['size'], 2)
self.assertEqual(results[1]['rendezvous'], 2)

self.assertEqual(results[2]['start_rank'], 0)
self.assertEqual(results[2]['size'], 4)
self.assertEqual(results[2]['rendezvous'], 3)

self.assertEqual(results[3]['start_rank'], 0)
self.assertEqual(results[3]['size'], 2)
self.assertEqual(results[3]['rendezvous'], 4)

self.assertEqual(results[4]['start_rank'], 0)
self.assertEqual(results[4]['size'], 4)
self.assertEqual(results[4]['rendezvous'], 5)

0 comments on commit ae0ad53

Please sign in to comment.