Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky ray tests #3430

Merged
merged 9 commits into from Mar 1, 2022
52 changes: 20 additions & 32 deletions test/single/test_ray.py
Expand Up @@ -3,18 +3,18 @@
This is currently not run on the Ray CI.
"""
import os
import socket
import sys

import socket
import pytest
import ray
from ray.util.client.ray_client_helpers import ray_start_client_server
import torch
from ray.util.client.ray_client_helpers import ray_start_client_server

from horovod.common.util import gloo_built
from horovod.ray.runner import (Coordinator, MiniSettings, RayExecutor)
from horovod.ray.worker import BaseHorovodWorker
from horovod.ray.strategy import create_placement_group
from horovod.ray.worker import BaseHorovodWorker

sys.path.append(os.path.dirname(__file__))

Expand All @@ -38,19 +38,27 @@ def ray_start_4_cpus():
@pytest.fixture
def ray_start_6_cpus():
address_info = ray.init(num_cpus=6)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
try:
yield address_info
finally:
# The code after the yield will run as teardown code.
ray.shutdown()


@pytest.fixture
def ray_start_4_cpus_4_gpus():
orig_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
address_info = ray.init(num_cpus=4, num_gpus=4)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
del os.environ["CUDA_VISIBLE_DEVICES"]
try:
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
finally:
if orig_devices:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None and "" have different meanings for CUDA_VISIBLE_DEVICES. Probably safer to be explicit:

if orig_devices is not None:
    ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, will better do that then.

os.environ["CUDA_VISIBLE_DEVICES"] = orig_devices
else:
del os.environ["CUDA_VISIBLE_DEVICES"]


@pytest.fixture
Expand All @@ -68,17 +76,6 @@ def ray_connect_handler(job_config=None):
yield


def check_resources(original_resources):
for i in reversed(range(10)):
if original_resources == ray.available_resources():
return True
else:
print(ray.available_resources())
import time
time.sleep(0.5)
return False


def test_coordinator_registration():
settings = MiniSettings()
coord = Coordinator(settings)
Expand Down Expand Up @@ -171,33 +168,30 @@ def test_infeasible_placement(ray_start_2_cpus, num_workers, num_hosts,
@pytest.mark.skipif(
not torch.cuda.is_available(), reason="GPU test requires CUDA.")
def test_gpu_ids(ray_start_4_cpus_4_gpus):
original_resources = ray.available_resources()
setting = RayExecutor.create_settings(timeout_s=30)
hjob = RayExecutor(
setting, num_hosts=1, num_workers_per_host=4, use_gpu=True)
hjob.start()
all_envs = hjob.execute(lambda _: os.environ.copy())
all_cudas = {ev["CUDA_VISIBLE_DEVICES"] for ev in all_envs}
assert len(all_cudas) == 1, all_cudas
assert len(all_envs[0]["CUDA_VISIBLE_DEVICES"].split(",")) == 4
assert len(all_envs[0]["CUDA_VISIBLE_DEVICES"].split(",")) == 4, all_envs[0]["CUDA_VISIBLE_DEVICES"]
hjob.shutdown()
assert check_resources(original_resources)


@pytest.mark.skipif(
torch.cuda.device_count() < 4, reason="GPU test requires 4 GPUs")
@pytest.mark.skipif(
not torch.cuda.is_available(), reason="GPU test requires CUDA.")
def test_gpu_ids_num_workers(ray_start_4_cpus_4_gpus):
original_resources = ray.available_resources()
setting = RayExecutor.create_settings(timeout_s=30)
hjob = RayExecutor(setting, num_workers=4, use_gpu=True)
hjob.start()
all_envs = hjob.execute(lambda _: os.environ.copy())
all_cudas = {ev["CUDA_VISIBLE_DEVICES"] for ev in all_envs}

assert len(all_cudas) == 1, all_cudas
assert len(all_envs[0]["CUDA_VISIBLE_DEVICES"].split(",")) == 4
assert len(all_envs[0]["CUDA_VISIBLE_DEVICES"].split(",")) == 4, all_envs[0]["CUDA_VISIBLE_DEVICES"]

def _test(worker):
import horovod.torch as hvd
Expand All @@ -208,7 +202,6 @@ def _test(worker):
all_valid_local_rank = hjob.execute(_test)
assert all(all_valid_local_rank)
hjob.shutdown()
assert check_resources(original_resources)


def test_horovod_mixin(ray_start_2_cpus):
Expand All @@ -224,7 +217,6 @@ class Test(BaseHorovodWorker):

@pytest.mark.parametrize(parameter_str, ray_executor_parametrized)
def test_local(ray_start_4_cpus, num_workers, num_hosts, num_workers_per_host):
original_resources = ray.available_resources()
setting = RayExecutor.create_settings(timeout_s=30)
hjob = RayExecutor(
setting,
Expand All @@ -235,16 +227,13 @@ def test_local(ray_start_4_cpus, num_workers, num_hosts, num_workers_per_host):
hostnames = hjob.execute(lambda _: socket.gethostname())
assert len(set(hostnames)) == 1, hostnames
hjob.shutdown()
assert check_resources(original_resources)


@pytest.mark.skipif(
not gloo_built(), reason='Gloo is required for Ray integration')
@pytest.mark.parametrize(parameter_str, ray_executor_parametrized)
def test_ray_init(ray_start_4_cpus, num_workers, num_hosts,
num_workers_per_host):
original_resources = ray.available_resources()

def simple_fn(worker):
import horovod.torch as hvd
hvd.init()
Expand All @@ -261,7 +250,6 @@ def simple_fn(worker):
result = hjob.execute(simple_fn)
assert len(set(result)) == 4
hjob.shutdown()
assert check_resources(original_resources)


@pytest.mark.skipif(
Expand Down