Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RayExecutor: Dynamic executor for elastic and static jobs #3230

Merged
merged 1 commit into from Nov 17, 2021

Conversation

ashahab
Copy link
Collaborator

@ashahab ashahab commented Oct 19, 2021

RayExecutor V2: Dynamic executor for elastic and static jobs

This resolves #3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):

from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()

Example of an elastic job:

from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)

@github-actions
Copy link

github-actions bot commented Oct 19, 2021

Unit Test Results

     765 files  +   369       765 suites  +369   6h 6m 54s ⏱️ + 2h 4m 35s
     602 tests +     11       561 ✔️ +     65       36 💤  -      55    5 +  1 
16 246 runs  +7 731  11 361 ✔️ +5 725  4 867 💤 +1 996  18 +10 

For more details on these failures, see this check.

Results for commit e031074. ± Comparison against base commit 06aa579.

This pull request removes 1 and adds 12 tests. Note that renamed tests count towards both.
test.single.test_ray ‑ test_ray_deprecation
test.single.test_ray_elastic_v2 ‑ test_both_num_workers_min_workers
test.single.test_ray_elastic_v2 ‑ test_fault_tolerance_hosts_added_and_removed
test.single.test_ray_elastic_v2 ‑ test_fault_tolerance_hosts_remove_and_add
test.single.test_ray_elastic_v2 ‑ test_gpu_e2e
test.single.test_ray_elastic_v2 ‑ test_max_np
test.single.test_ray_elastic_v2 ‑ test_min_np
test.single.test_ray_elastic_v2.TestRayDiscoverySuite ‑ test_cpu_discovery
test.single.test_ray_elastic_v2.TestRayDiscoverySuite ‑ test_gpu_discovery
test.single.test_ray_elastic_v2.TestRayDiscoverySuite ‑ test_gpu_slot_discovery
test.single.test_ray_elastic_v2.TestRayDiscoverySuite ‑ test_multinode
…

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Oct 19, 2021

Unit Test Results (with flaky tests)

     995 files  +   435       995 suites  +435   7h 0m 29s ⏱️ + 2h 22m 32s
     602 tests +     11       560 ✔️ +     64       36 💤  -      55    6 +  2 
21 504 runs  +9 205  14 762 ✔️ +6 852  6 687 💤 +2 322  55 +31 

For more details on these failures, see this check.

Results for commit e031074. ± Comparison against base commit 06aa579.

This pull request removes 1 and adds 12 tests. Note that renamed tests count towards both.
test.single.test_ray ‑ test_ray_deprecation
test.single.test_ray_elastic_v2 ‑ test_both_num_workers_min_workers
test.single.test_ray_elastic_v2 ‑ test_fault_tolerance_hosts_added_and_removed
test.single.test_ray_elastic_v2 ‑ test_fault_tolerance_hosts_remove_and_add
test.single.test_ray_elastic_v2 ‑ test_gpu_e2e
test.single.test_ray_elastic_v2 ‑ test_max_np
test.single.test_ray_elastic_v2 ‑ test_min_np
test.single.test_ray_elastic_v2.TestRayDiscoverySuite ‑ test_cpu_discovery
test.single.test_ray_elastic_v2.TestRayDiscoverySuite ‑ test_gpu_discovery
test.single.test_ray_elastic_v2.TestRayDiscoverySuite ‑ test_gpu_slot_discovery
test.single.test_ray_elastic_v2.TestRayDiscoverySuite ‑ test_multinode
…

♻️ This comment has been updated with latest results.

@ashahab ashahab force-pushed the abin-issue-3190-v2 branch 2 times, most recently from cc8edda to 8195064 Compare October 22, 2021 19:29
Copy link
Collaborator

@richardliaw richardliaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashahab Can you please explicitly state what is the delta between the standard RayExecutor and the new RayExecutorV2 in terms of API?

Is it possible to maintain compatibility across the two versions (while being OK with breaking ElasticRayExecutor)?

@ashahab
Copy link
Collaborator Author

ashahab commented Nov 3, 2021

@ashahab Can you please explicitly state what is the delta between the standard RayExecutor and the new RayExecutorV2 in terms of API?

Is it possible to maintain compatibility across the two versions (while being OK with breaking ElasticRayExecutor)?

@richardliaw I see two options here:
Option 1: Keep RayExecutor and RayExecutorV2 API identical. Elastic and non Elastic modes would have different behavior.
I can update the current PR to make RayExecutorV2 identical to RayExecutor, with the difference being when users invoke the elastic API, start and run methods would change behavior.
The ElasticRayExecutor does all of the worker orchestration and execution in the run function, whereas the RayExecutor performs worker orchestration in start and execution in one or more run functions.
When users will use the elastic params in RayExecutorV2, e.g. min_np and max_np, run would change its behavior.

Option 2: Get rid of the start function. This makes elastic and non-elastic modes identical in terms of orchestration and execution.
I had some discussion with @amogkam on getting rid of the start function from RayExecutorV2 completely, which would make the API very different from RayExecutor, but the elastic and non-elastic versions both would have orchestration and execution done in the run function, and would probably share much more code paths.

Which option is preferred? @tgaddair what's your thought on keeping the current API vs. making the new API identical for elastic and non-elastic

@richardliaw
Copy link
Collaborator

I think (1) is a safer move given that there are multiple organizations depending on RayExecutor, and introducing instability into the supply chain is highly undesirable.

I'm actually not that worried about the behaviors of "start" being different from "run". That being said, I would be open to user feedback in the future about the APIs being unintuitive.

I would recommend documenting it clearly though.

@tgaddair
Copy link
Collaborator

tgaddair commented Nov 4, 2021

Hey @ashahab, this looks awesome. In general, I think this does a great job of cleanly unifying the elastic and non-elastic use cases. A few general observations based on the above discussion:

  1. I agree with @richardliaw that we should try to keep the user-facing RayExecutor API unchanged for the non-elastic scenario, since there are organizations using it in production.
  2. I also agree that breaking / removing ElasticRayExecutor is acceptable (though we should probably start with a deprecation warning), with the plan being to remove it in v1.0.
  3. To achieve goals (1) and (2), my suggestion would be to not have the separate ElasticParams and NonElasticParams. Instead, we use the same set of kwargs for both, and then use the ElasticAdapter if min_np or max_np is set explicitly. Otherwise, we use the existing non-elastic behavior. Alternatively, we can have an elastic=False kwarg, I'm fine either way (we just need to do more validation logic with this additional param).
  4. Having different behavior for start() and run() is okay, as Richard said, we should just be clear in the docs what the differences are. For elastic, I think it's fine if start() simply creates the driver and begins the discovery process and other background processes, as it does now. In the future, it would also be useful to create the desired set of workers, which could then be updated dynamically as part of the add / removal process. This would help make the elastic API align better with the non-elastic API (so the current set of workers can be exposed through the API), but it's not essential at the moment, I think.

There is a useful benefit to start(), which is the stateful execution. This is something we make use of in Ludwig, for example:

https://github.com/ludwig-ai/ludwig/blob/master/ludwig/backend/ray.py#L304

So to sum up, I think preserving RayExecutor's existing API is important, and it's okay if the elastic mode behaves slightly differently for now, as long as it's documented and there is plan for getting closer alignment in the future (where it makes sense).

@ashahab @richardliaw does this make sense to you, or were there other considerations I missed?

@ashahab ashahab force-pushed the abin-issue-3190-v2 branch 2 times, most recently from 69cf2af to f0af3e5 Compare November 5, 2021 21:27
@ashahab ashahab changed the title RayExecutorV2: Dynamic executor for elastic and static jobs RayExecutor: Dynamic executor for elastic and static jobs Nov 5, 2021
@ashahab ashahab force-pushed the abin-issue-3190-v2 branch 3 times, most recently from 97fd361 to d482803 Compare November 7, 2021 05:22
horovod/ray/runner.py Outdated Show resolved Hide resolved
horovod/ray/runner.py Outdated Show resolved Hide resolved
@ashahab ashahab force-pushed the abin-issue-3190-v2 branch 3 times, most recently from ec16f73 to bbece07 Compare November 10, 2021 18:34
Copy link
Collaborator

@richardliaw richardliaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This diff looks good to me. Can you try merging master again?

This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
@ashahab ashahab merged commit a729ba7 into horovod:master Nov 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

Refactor ElasticRayExecutor as part of RayExecutor
3 participants