Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ElasticRayExecutor as part of RayExecutor #3190

Closed
tgaddair opened this issue Oct 1, 2021 · 0 comments · Fixed by #3230
Closed

Refactor ElasticRayExecutor as part of RayExecutor #3190

tgaddair opened this issue Oct 1, 2021 · 0 comments · Fixed by #3230

Comments

@tgaddair
Copy link
Collaborator

tgaddair commented Oct 1, 2021

Currently, ElasticRayExecutor is implemented as a separate class from the normal RayExecutor, but this doesn't need to be the case. If we instead add the ElasticRayExecutor params to the RayExecutor constructor, we can dynamically create an autoscaling worker topology when the user provides min_workers or max_workers, or any other elastic-only params.

ashahab added a commit to ashahab/horovod that referenced this issue Oct 18, 2021
This resolves horovod#3190 with a new RayExecutor API for horovod:
`RayExecutorV2`. This API supports both static(non-elastic) and elastic horovod jobs.

Example of static job:
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutorV2(setting, NoneElasticParams(
        np=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutorV2(settings, ElasticParams(use_gpu=True, cpus_per_worker=2))
executor.start()
executor.run(training_fn)
```
ashahab added a commit to ashahab/horovod that referenced this issue Oct 18, 2021
This resolves horovod#3190 with a new RayExecutor API for horovod:
`RayExecutorV2`. This API supports both static(non-elastic) and elastic horovod jobs.

Example of static job:
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutorV2(setting, NoneElasticParams(
        np=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutorV2(settings, ElasticParams(use_gpu=True, cpus_per_worker=2))
executor.start()
executor.run(training_fn)
```
ashahab added a commit to ashahab/horovod that referenced this issue Oct 18, 2021
This resolves horovod#3190 with a new RayExecutor API for horovod:
`RayExecutorV2`. This API supports both static(non-elastic) and elastic horovod jobs.

Example of static job:
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutorV2(setting, NoneElasticParams(
        np=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutorV2(settings, ElasticParams(use_gpu=True, cpus_per_worker=2))
executor.start()
executor.run(training_fn)
```
ashahab added a commit to ashahab/horovod that referenced this issue Oct 18, 2021
This resolves horovod#3190 with a new RayExecutor API for horovod:
`RayExecutorV2`. This API supports both static(non-elastic) and elastic horovod jobs.

Example of static job:
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutorV2(setting, NoneElasticParams(
        np=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutorV2(settings, ElasticParams(use_gpu=True, cpus_per_worker=2))
executor.start()
executor.run(training_fn)
```
ashahab added a commit to ashahab/horovod that referenced this issue Oct 18, 2021
This resolves horovod#3190 with a new RayExecutor API for horovod:
`RayExecutorV2`. This API supports both static(non-elastic) and elastic horovod jobs.

Example of static job:
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutorV2(setting, NoneElasticParams(
        np=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutorV2(settings, ElasticParams(use_gpu=True, cpus_per_worker=2))
executor.start()
executor.run(training_fn)
```
ashahab added a commit to ashahab/horovod that referenced this issue Oct 18, 2021
This resolves horovod#3190 with a new RayExecutor API for horovod:
`RayExecutorV2`. This API supports both static(non-elastic) and elastic horovod jobs.

Example of static job:
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutorV2(setting, NoneElasticParams(
        np=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutorV2(settings, ElasticParams(use_gpu=True, cpus_per_worker=2))
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Oct 19, 2021
This resolves horovod#3190 with a new RayExecutor API for horovod:
`RayExecutorV2`. This API supports both static(non-elastic) and elastic horovod jobs.

Example of static job:
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutorV2(setting, NoneElasticParams(
        np=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutorV2(settings, ElasticParams(use_gpu=True, cpus_per_worker=2))
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Oct 19, 2021
This resolves horovod#3190 with a new RayExecutor API for horovod:
`RayExecutorV2`. This API supports both static(non-elastic) and elastic horovod jobs.

Example of static job:
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutorV2(setting, NoneElasticParams(
        np=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutorV2(settings, ElasticParams(use_gpu=True, cpus_per_worker=2))
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Oct 19, 2021
This resolves horovod#3190 with a new RayExecutor API for horovod:
`RayExecutorV2`. This API supports both static(non-elastic) and elastic horovod jobs.

Example of static job:
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutorV2(setting, NoneElasticParams(
        np=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutorV2(settings, ElasticParams(use_gpu=True, cpus_per_worker=2))
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Oct 22, 2021
This resolves horovod#3190 with a new RayExecutor API for horovod:
`RayExecutorV2`. This API supports both static(non-elastic) and elastic horovod jobs.

Example of static job:
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutorV2(setting, NoneElasticParams(
        np=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutorV2(settings, ElasticParams(use_gpu=True, cpus_per_worker=2))
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Nov 4, 2021
This resolves horovod#3190 with a new RayExecutor API for horovod:
`RayExecutorV2`. This API supports both static(non-elastic) and elastic horovod jobs.

Example of static job:
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutorV2(setting, NoneElasticParams(
        np=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutorV2(settings, ElasticParams(use_gpu=True, cpus_per_worker=2))
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Nov 5, 2021
This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Nov 5, 2021
This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Nov 5, 2021
This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Nov 7, 2021
This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Nov 9, 2021
This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Nov 10, 2021
This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Nov 10, 2021
This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Nov 10, 2021
This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Nov 16, 2021
This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit to ashahab/horovod that referenced this issue Nov 16, 2021
This resolves horovod#3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
ashahab added a commit that referenced this issue Nov 17, 2021
This resolves #3190 by adding elastic params to the RayExecutor API for horovod:
This API now supports both static(non-elastic) and elastic horovod jobs.

Example of static job(Identical to current RayExecutor):
```python
from horovod.ray import RayExecutor
ray.init()
hjob = RayExecutor(setting, num_workers=num_workers,
        use_gpu=True
    ))

executor.start()

def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

result = executor.run(simple_fn)
assert len(set(result)) == hosts * num_slots

executor.shutdown()
```
Example of an elastic job:
```python
from horovod.ray import RayExecutor
import horovod.torch as hvd

def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()

    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

executor = RayExecutor(settings, min_workers=1, use_gpu=True, cpus_per_worker=2)
executor.start()
executor.run(training_fn)
```

Signed-off-by: Abin Shahab <ashahab@linkedin.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging a pull request may close this issue.

1 participant