Skip to content

Commit

Permalink
Fix flaky ray tests (#3430)
Browse files Browse the repository at this point in the history
Signed-off-by: Enrico Minack <github@enrico.minack.dev>
  • Loading branch information
EnricoMi committed Mar 1, 2022
1 parent 7b5346e commit b553974
Showing 1 changed file with 20 additions and 32 deletions.
52 changes: 20 additions & 32 deletions test/single/test_ray.py
Expand Up @@ -3,18 +3,18 @@
This is currently not run on the Ray CI.
"""
import os
import socket
import sys

import socket
import pytest
import ray
from ray.util.client.ray_client_helpers import ray_start_client_server
import torch
from ray.util.client.ray_client_helpers import ray_start_client_server

from horovod.common.util import gloo_built
from horovod.ray.runner import (Coordinator, MiniSettings, RayExecutor)
from horovod.ray.worker import BaseHorovodWorker
from horovod.ray.strategy import create_placement_group
from horovod.ray.worker import BaseHorovodWorker

sys.path.append(os.path.dirname(__file__))

Expand All @@ -38,19 +38,27 @@ def ray_start_4_cpus():
@pytest.fixture
def ray_start_6_cpus():
address_info = ray.init(num_cpus=6)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
try:
yield address_info
finally:
# The code after the yield will run as teardown code.
ray.shutdown()


@pytest.fixture
def ray_start_4_cpus_4_gpus():
orig_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
address_info = ray.init(num_cpus=4, num_gpus=4)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
del os.environ["CUDA_VISIBLE_DEVICES"]
try:
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
finally:
if orig_devices is not None:
os.environ["CUDA_VISIBLE_DEVICES"] = orig_devices
else:
del os.environ["CUDA_VISIBLE_DEVICES"]


@pytest.fixture
Expand All @@ -68,17 +76,6 @@ def ray_connect_handler(job_config=None):
yield


def check_resources(original_resources):
for i in reversed(range(10)):
if original_resources == ray.available_resources():
return True
else:
print(ray.available_resources())
import time
time.sleep(0.5)
return False


def test_coordinator_registration():
settings = MiniSettings()
coord = Coordinator(settings)
Expand Down Expand Up @@ -171,33 +168,30 @@ def test_infeasible_placement(ray_start_2_cpus, num_workers, num_hosts,
@pytest.mark.skipif(
not torch.cuda.is_available(), reason="GPU test requires CUDA.")
def test_gpu_ids(ray_start_4_cpus_4_gpus):
original_resources = ray.available_resources()
setting = RayExecutor.create_settings(timeout_s=30)
hjob = RayExecutor(
setting, num_hosts=1, num_workers_per_host=4, use_gpu=True)
hjob.start()
all_envs = hjob.execute(lambda _: os.environ.copy())
all_cudas = {ev["CUDA_VISIBLE_DEVICES"] for ev in all_envs}
assert len(all_cudas) == 1, all_cudas
assert len(all_envs[0]["CUDA_VISIBLE_DEVICES"].split(",")) == 4
assert len(all_envs[0]["CUDA_VISIBLE_DEVICES"].split(",")) == 4, all_envs[0]["CUDA_VISIBLE_DEVICES"]
hjob.shutdown()
assert check_resources(original_resources)


@pytest.mark.skipif(
torch.cuda.device_count() < 4, reason="GPU test requires 4 GPUs")
@pytest.mark.skipif(
not torch.cuda.is_available(), reason="GPU test requires CUDA.")
def test_gpu_ids_num_workers(ray_start_4_cpus_4_gpus):
original_resources = ray.available_resources()
setting = RayExecutor.create_settings(timeout_s=30)
hjob = RayExecutor(setting, num_workers=4, use_gpu=True)
hjob.start()
all_envs = hjob.execute(lambda _: os.environ.copy())
all_cudas = {ev["CUDA_VISIBLE_DEVICES"] for ev in all_envs}

assert len(all_cudas) == 1, all_cudas
assert len(all_envs[0]["CUDA_VISIBLE_DEVICES"].split(",")) == 4
assert len(all_envs[0]["CUDA_VISIBLE_DEVICES"].split(",")) == 4, all_envs[0]["CUDA_VISIBLE_DEVICES"]

def _test(worker):
import horovod.torch as hvd
Expand All @@ -208,7 +202,6 @@ def _test(worker):
all_valid_local_rank = hjob.execute(_test)
assert all(all_valid_local_rank)
hjob.shutdown()
assert check_resources(original_resources)


def test_horovod_mixin(ray_start_2_cpus):
Expand All @@ -224,7 +217,6 @@ class Test(BaseHorovodWorker):

@pytest.mark.parametrize(parameter_str, ray_executor_parametrized)
def test_local(ray_start_4_cpus, num_workers, num_hosts, num_workers_per_host):
original_resources = ray.available_resources()
setting = RayExecutor.create_settings(timeout_s=30)
hjob = RayExecutor(
setting,
Expand All @@ -235,16 +227,13 @@ def test_local(ray_start_4_cpus, num_workers, num_hosts, num_workers_per_host):
hostnames = hjob.execute(lambda _: socket.gethostname())
assert len(set(hostnames)) == 1, hostnames
hjob.shutdown()
assert check_resources(original_resources)


@pytest.mark.skipif(
not gloo_built(), reason='Gloo is required for Ray integration')
@pytest.mark.parametrize(parameter_str, ray_executor_parametrized)
def test_ray_init(ray_start_4_cpus, num_workers, num_hosts,
num_workers_per_host):
original_resources = ray.available_resources()

def simple_fn(worker):
import horovod.torch as hvd
hvd.init()
Expand All @@ -261,7 +250,6 @@ def simple_fn(worker):
result = hjob.execute(simple_fn)
assert len(set(result)) == 4
hjob.shutdown()
assert check_resources(original_resources)


@pytest.mark.skipif(
Expand Down

0 comments on commit b553974

Please sign in to comment.