Skip to content

Commit

Permalink
Remove gen_cluster spill tests (#758)
Browse files Browse the repository at this point in the history
The way coroutines are handled in Distributed test with gen_cluster was
changed in dask/distributed#5446, breaking the
two tests that still relied on gen_cluster. Since we don't encourage
using using coroutines/yield, there's really little reason to keep those
tests.

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Benjamin Zaitlen (https://github.com/quasiben)

URL: #758
  • Loading branch information
pentschev committed Oct 22, 2021
1 parent e9a291a commit 36a0251
Showing 1 changed file with 1 addition and 171 deletions.
172 changes: 1 addition & 171 deletions dask_cuda/tests/test_spill.py
Expand Up @@ -10,10 +10,8 @@
from distributed.metrics import time
from distributed.sizeof import sizeof
from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401
from distributed.worker import Worker

from dask_cuda import LocalCUDACluster, utils
from dask_cuda.device_host_file import DeviceHostFile

if utils.get_device_total_memory() < 1e10:
pytest.skip("Not enough GPU memory", allow_module_level=True)
Expand Down Expand Up @@ -80,85 +78,6 @@ def delayed_worker_assert(total_size, device_chunk_overhead, serialized_chunk_ov
)


# @pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/79")
@pytest.mark.parametrize(
"params",
[
{
"device_memory_limit": int(200e6),
"memory_limit": int(800e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": 0.0,
"spills_to_disk": False,
},
{
"device_memory_limit": int(200e6),
"memory_limit": int(200e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": 0.0,
"spills_to_disk": True,
},
{
"device_memory_limit": int(200e6),
"memory_limit": 0,
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": 0.0,
"spills_to_disk": False,
},
],
)
def test_cupy_device_spill(params):
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)],
Worker=Worker,
timeout=60,
worker_kwargs={
"memory_limit": params["memory_limit"],
"data": DeviceHostFile(
device_memory_limit=params["device_memory_limit"],
memory_limit=params["memory_limit"],
),
},
config={
"distributed.comm.timeouts.connect": "20s",
"distributed.worker.memory.target": params["host_target"],
"distributed.worker.memory.spill": params["host_spill"],
"distributed.worker.memory.pause": params["host_pause"],
},
)
def test_device_spill(client, scheduler, worker):
cupy = pytest.importorskip("cupy")
rs = da.random.RandomState(RandomState=cupy.random.RandomState)
x = rs.random(int(50e6), chunks=2e6)

xx = x.persist()
yield wait(xx)

# Allow up to 1024 bytes overhead per chunk serialized
yield client.run(worker_assert, x.nbytes, 1024, 1024)

y = client.compute(x.sum())
res = yield y

assert (abs(res / x.size) - 0.5) < 1e-3

yield client.run(worker_assert, x.nbytes, 1024, 1024)
host_chunks = yield client.run(lambda: len(get_worker().data.host))
disk_chunks = yield client.run(lambda: len(get_worker().data.disk or list()))
for hc, dc in zip(host_chunks.values(), disk_chunks.values()):
if params["spills_to_disk"]:
assert dc > 0
else:
assert hc > 0
assert dc == 0

test_device_spill()


@pytest.mark.parametrize(
"params",
[
Expand Down Expand Up @@ -234,95 +153,6 @@ async def test_cupy_cluster_device_spill(params):
assert dc == 0


@pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/79")
@pytest.mark.parametrize(
"params",
[
{
"device_memory_limit": int(200e6),
"memory_limit": int(800e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": 0.0,
"spills_to_disk": False,
},
{
"device_memory_limit": int(200e6),
"memory_limit": int(200e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": 0.0,
"spills_to_disk": True,
},
{
"device_memory_limit": int(200e6),
"memory_limit": 0,
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": 0.0,
"spills_to_disk": False,
},
],
)
def test_cudf_device_spill(params):
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)],
Worker=Worker,
timeout=60,
worker_kwargs={
"memory_limit": params["memory_limit"],
"data": DeviceHostFile(
device_memory_limit=params["device_memory_limit"],
memory_limit=params["memory_limit"],
),
},
config={
"distributed.comm.timeouts.connect": "20s",
"distributed.worker.memory.target": params["host_target"],
"distributed.worker.memory.spill": params["host_spill"],
"distributed.worker.memory.pause": params["host_pause"],
},
)
def test_device_spill(client, scheduler, worker):
cudf = pytest.importorskip("cudf")
# There's a known issue with datetime64:
# https://github.com/numpy/numpy/issues/4983#issuecomment-441332940
# The same error above happens when spilling datetime64 to disk
cdf = (
dask.datasets.timeseries(dtypes={"x": int, "y": float}, freq="100ms")
.reset_index(drop=True)
.map_partitions(cudf.from_pandas)
)

sizes = yield client.compute(cdf.map_partitions(lambda df: df.__sizeof__()))
sizes = sizes.tolist()
nbytes = sum(sizes)
part_index_nbytes = (yield client.compute(cdf.partitions[0].index)).__sizeof__()

cdf2 = cdf.persist()
yield wait(cdf2)

del cdf

host_chunks = yield client.run(lambda: len(get_worker().data.host))
disk_chunks = yield client.run(lambda: len(get_worker().data.disk or list()))
for hc, dc in zip(host_chunks.values(), disk_chunks.values()):
if params["spills_to_disk"]:
assert dc > 0
else:
assert hc > 0
assert dc == 0

yield client.run(worker_assert, nbytes, 32, 2048 + part_index_nbytes)

del cdf2

yield client.run(delayed_worker_assert, 0, 0, 0)

test_device_spill()


@pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/79")
@pytest.mark.parametrize(
"params",
Expand Down Expand Up @@ -358,7 +188,7 @@ async def test_cudf_cluster_device_spill(params):
cudf = pytest.importorskip("cudf")
with dask.config.set({"distributed.worker.memory.terminate": False}):
async with LocalCUDACluster(
1,
n_workers=1,
device_memory_limit=params["device_memory_limit"],
memory_limit=params["memory_limit"],
memory_target_fraction=params["host_target"],
Expand Down

0 comments on commit 36a0251

Please sign in to comment.