Skip to content

Commit

Permalink
RayExecutorV2: Dynamic executor for elastic and static jobs
Browse files Browse the repository at this point in the history
This resolves horovod#3190 with a new RayExecutor API for horovod:
`RayExecutorV2`. This API supports both static(non-elastic) and elastic horovod jobs.

Example of static job:
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutorV2(setting, NoneElasticParams(
        np=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutorV2(settings, ElasticParams(use_gpu=True, cpus_per_worker=2))
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
  • Loading branch information
ashahab committed Nov 4, 2021
1 parent 660f7ff commit 69cf2af
Show file tree
Hide file tree
Showing 7 changed files with 419 additions and 322 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Expand Up @@ -7,13 +7,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## [Unreleased] - YYYY-MM-DD

### Added
- Added RayExecutorV2 API: This API supports both static(non-elastic) and elastic horovod jobs. This resolves issue:
[#3190](https://github.com/horovod/horovod/issues/3190).

- TensorFlow: Added in-place broadcasting of variables. ([#3128](https://github.com/horovod/horovod/pull/3128))

### Changed

### Deprecated

- Deprecated RayExecutor and ElasticRayExecutor APIs in favor of RayExecutorV2 API for issue: [#3190](https://github.com/horovod/horovod/issues/3190).
### Removed

### Fixed
Expand Down
22 changes: 12 additions & 10 deletions docs/ray.rst
Expand Up @@ -5,7 +5,7 @@ Horovod on Ray

``horovod.ray`` allows users to leverage Horovod on `a Ray cluster <https://docs.ray.io/en/latest/cluster/index.html>`_.

Currently, the Ray + Horovod integration provides a :ref:`RayExecutor API <horovod_ray_api>`.
Currently, the Ray + Horovod integration provides a :ref:`RayExecutorV2 API <horovod_ray_api>`.

.. note:: The Ray + Horovod integration currently only supports a Gloo backend.

Expand All @@ -24,25 +24,25 @@ See the Ray documentation for `advanced installation instructions <https://docs.
Horovod Ray Executor
--------------------

The Horovod Ray integration offers a ``RayExecutor`` abstraction (:ref:`docs <horovod_ray_api>`),
The Horovod Ray integration offers a ``RayExecutorV2`` abstraction (:ref:`docs <horovod_ray_api>`),
which is a wrapper over a group of `Ray actors (stateful processes) <https://docs.ray.io/en/latest/walkthrough.html#remote-classes-actors>`_.

.. code-block:: python
from horovod.ray import RayExecutor
from horovod.ray import RayExecutorV2
# Start the Ray cluster or attach to an existing Ray cluster
ray.init()
# Start num_workers actors on the cluster
executor = RayExecutor(
executor = RayExecutorV2(
setting, num_workers=num_workers, use_gpu=True)
# This will launch `num_workers` actors on the Ray Cluster.
executor.start()
All actors will be part of the Horovod ring, so ``RayExecutor`` invocations will be able to support arbitrary Horovod collective operations.
All actors will be part of the Horovod ring, so ``RayExecutorV2`` invocations will be able to support arbitrary Horovod collective operations.

Note that there is an implicit assumption on the cluster being homogenous in shape (i.e., all machines have the same number of slots available). This is simply
an implementation detail and is not a fundamental limitation.
Expand Down Expand Up @@ -74,7 +74,7 @@ A unique feature of Ray is its support for `stateful Actors <https://docs.ray.io
import torch
from horovod.torch import hvd
from horovod.ray import RayExecutor
from horovod.ray import RayExecutorV2, NonElasticParams
class MyModel:
def __init__(self, learning_rate):
Expand All @@ -93,7 +93,7 @@ A unique feature of Ray is its support for `stateful Actors <https://docs.ray.io
ray.init()
executor = RayExecutor(...)
executor = RayExecutorV2(...)
executor.start(executable_cls=MyModel)
# Run 5 training steps
Expand Down Expand Up @@ -153,10 +153,12 @@ You can then attach to the underlying Ray cluster and execute the training funct
.. code-block:: python
import ray
from horovod.ray import RayExecutorV2, ElasticParams
ray.init(address="auto") # attach to the Ray cluster
settings = ElasticRayExecutor.create_settings(verbose=True)
executor = ElasticRayExecutor(
settings, use_gpu=True, cpus_per_slot=2)
settings = RayExecutorV2.create_settings(verbose=True)
executor = RayExecutorV2(
settings, ElasticParams(min_np=1, use_gpu=True, cpus_per_slot=2))
executor.start()
executor.run(training_fn)
Expand Down
3 changes: 1 addition & 2 deletions horovod/ray/__init__.py
@@ -1,5 +1,4 @@
from .worker import BaseHorovodWorker
from .runner import RayExecutor
from .elastic import ElasticRayExecutor

__all__ = ["RayExecutor", "BaseHorovodWorker", "ElasticRayExecutor"]
__all__ = ["RayExecutor", "BaseHorovodWorker"]

0 comments on commit 69cf2af

Please sign in to comment.