Skip to content

Commit

Permalink
RayExecutor V2: Dynamic executor for elastic and static jobs
Browse files Browse the repository at this point in the history
This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
  • Loading branch information
ashahab committed Nov 10, 2021
1 parent 660f7ff commit ec16f73
Show file tree
Hide file tree
Showing 8 changed files with 1,236 additions and 119 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Expand Up @@ -7,13 +7,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## [Unreleased] - YYYY-MM-DD

### Added
- Added Elastic keyword parameters to RayExecutor API: This API supports both static(non-elastic) and elastic horovod jobs. This resolves issue:
[#3190](https://github.com/horovod/horovod/issues/3190).

- TensorFlow: Added in-place broadcasting of variables. ([#3128](https://github.com/horovod/horovod/pull/3128))

### Changed

### Deprecated

- Deprecated ElasticRayExecutor APIs in favor of the new RayExecutor API for issue: [#3190](https://github.com/horovod/horovod/issues/3190).
### Removed

### Fixed
Expand Down
10 changes: 6 additions & 4 deletions docs/ray.rst
Expand Up @@ -110,7 +110,7 @@ A unique feature of Ray is its support for `stateful Actors <https://docs.ray.io
Elastic Ray Executor
--------------------

Ray also supports `elastic execution <elastic.rst>`_ via :ref:`the ElasticRayExecutor <horovod_ray_api>`. Similar to default Horovod, the difference between the non-elastic and elastic versions of Ray is that the hosts and number of workers is dynamically determined at runtime.
Ray also supports `elastic execution <elastic.rst>`_ via :ref:`the RayExecutor <horovod_ray_api>`. Similar to default Horovod, the difference between the non-elastic and elastic versions of Ray is that the hosts and number of workers is dynamically determined at runtime.

You must first set up `a Ray cluster`_. Ray clusters can support autoscaling for any cloud provider (AWS, GCP, Azure).

Expand Down Expand Up @@ -153,10 +153,12 @@ You can then attach to the underlying Ray cluster and execute the training funct
.. code-block:: python
import ray
from horovod.ray import RayExecutor
ray.init(address="auto") # attach to the Ray cluster
settings = ElasticRayExecutor.create_settings(verbose=True)
executor = ElasticRayExecutor(
settings, use_gpu=True, cpus_per_slot=2)
settings = RayExecutor.create_settings(verbose=True)
executor = RayExecutor(
settings, min_workers=1, use_gpu=True, cpus_per_slot=2)
executor.start()
executor.run(training_fn)
Expand Down
126 changes: 126 additions & 0 deletions horovod/ray/adapter.py
@@ -0,0 +1,126 @@
from abc import ABC, abstractmethod
from typing import Dict, Callable, Any, Optional, List
from dataclasses import dataclass

@dataclass
class BaseParams:
cpus_per_worker: int = 1
use_gpu: bool = False
gpus_per_worker: Optional[int] = None
def __post_init__(self):
if self.gpus_per_worker and not self.use_gpu:
raise ValueError("gpus_per_worker is set, but use_gpu is False. "
"use_gpu must be True if gpus_per_worker is "
"set. ")
if self.use_gpu and isinstance(self.gpus_per_worker,
int) and self.gpus_per_worker < 1:
raise ValueError(
f"gpus_per_worker must be >= 1: Got {self.gpus_per_worker}.")
self.gpus_per_worker = self.gpus_per_worker or int(self.use_gpu)


class Adapter(ABC):
"""Adapter for executing Ray calls for various types(e.g. static and elastic)
Horovod jobs.
"""
@abstractmethod
def start(self,
executable_cls: type = None,
executable_args: Optional[List] = None,
executable_kwargs: Optional[Dict] = None,
extra_env_vars: Optional[Dict] = None):
"""Starts the Adapter
Args:
executable_cls (type): The class that will be created within
an actor (BaseHorovodWorker). This will allow Horovod
to establish its connections and set env vars.
executable_args (List): Arguments to be passed into the
worker class upon initialization.
executable_kwargs (Dict): Keyword arguments to be passed into the
worker class upon initialization.
extra_env_vars (Dict): Environment variables to be set
on the actors (worker processes) before initialization.
"""
raise NotImplementedError("Method must be implemented in a subclass")

@abstractmethod
def execute(self, fn: Callable[["executable_cls"], Any],
callbacks: Optional[List[Callable]] = None) -> List[Any]:
"""Executes the provided function on all workers.
Args:
fn: Target function to be invoked on every object.
callbacks: List of callables. Each callback must either
be a callable function or a class that implements __call__.
Every callback will be invoked on every value logged
by the rank 0 worker.
Returns:
Deserialized return values from the target function.
"""
raise NotImplementedError("Method must be implemented in a subclass")

@abstractmethod
def run(self,
fn: Callable[[Any], Any],
args: Optional[List] = None,
kwargs: Optional[Dict] = None,
callbacks: Optional[List[Callable]] = None) -> List[Any]:
"""Executes the provided function on all workers.
Args:
fn: Target function that can be executed with arbitrary
args and keyword arguments.
args: List of arguments to be passed into the target function.
kwargs: Dictionary of keyword arguments to be
passed into the target function.
callbacks: List of callables. Each callback must either
be a callable function or a class that implements __call__.
Every callback will be invoked on every value logged
by the rank 0 worker.
Returns:
Deserialized return values from the target function.
"""
raise NotImplementedError("Method must be implemented in a subclass")

@abstractmethod
def run_remote(self,
fn: Callable[[Any], Any],
args: Optional[List] = None,
kwargs: Optional[Dict] = None,
callbacks: Optional[List[Callable]] = None):

"""Executes the provided function on all workers.
Args:
fn: Target function that can be executed with arbitrary
args and keyword arguments.
args: List of arguments to be passed into the target function.
kwargs: Dictionary of keyword arguments to be
passed into the target function.
Returns:
list: List of ObjectRefs that you can run `ray.get` on to
retrieve values.
"""
raise NotImplementedError("Method must be implemented in a subclass")

@abstractmethod
def execute_single(self,
fn: Callable[["executable_cls"], Any]) -> List[Any]:
"""Executes the provided function on the rank 0 worker (chief).
Args:
fn: Target function to be invoked on the chief object.
Returns:
Deserialized return values from the target function.
"""
raise NotImplementedError("Method must be implemented in a subclass")

@abstractmethod
def shutdown(self):
"""Destroys the adapter."""
raise NotImplementedError("Method must be implemented in a subclass")
1 change: 1 addition & 0 deletions horovod/ray/elastic.py
Expand Up @@ -179,6 +179,7 @@ class ElasticRayExecutor:
settings, use_gpu=True, cpus_per_slot=2)
executor.start()
executor.run(train_fn)
warning:: .. deprecated:: 0.25.0
"""

@staticmethod
Expand Down

0 comments on commit ec16f73

Please sign in to comment.