From ebe83b048c991c2fb3657eda27e96b99695acbd6 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 22 Oct 2021 05:51:13 -0700 Subject: [PATCH] Remove gen_cluster spill tests The way coroutines are handled in Distributed test with gen_cluster was changed in https://github.com/dask/distributed/pull/5446, breaking the two tests that still relied on gen_cluster. Since we don't encourage using using coroutines/yield, there's really little reason to keep those tests. --- dask_cuda/tests/test_spill.py | 172 +--------------------------------- 1 file changed, 1 insertion(+), 171 deletions(-) diff --git a/dask_cuda/tests/test_spill.py b/dask_cuda/tests/test_spill.py index 5b62406d..ef2b3ded 100644 --- a/dask_cuda/tests/test_spill.py +++ b/dask_cuda/tests/test_spill.py @@ -10,10 +10,8 @@ from distributed.metrics import time from distributed.sizeof import sizeof from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401 -from distributed.worker import Worker from dask_cuda import LocalCUDACluster, utils -from dask_cuda.device_host_file import DeviceHostFile if utils.get_device_total_memory() < 1e10: pytest.skip("Not enough GPU memory", allow_module_level=True) @@ -80,85 +78,6 @@ def delayed_worker_assert(total_size, device_chunk_overhead, serialized_chunk_ov ) -# @pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/79") -@pytest.mark.parametrize( - "params", - [ - { - "device_memory_limit": int(200e6), - "memory_limit": int(800e6), - "host_target": 0.0, - "host_spill": 0.0, - "host_pause": 0.0, - "spills_to_disk": False, - }, - { - "device_memory_limit": int(200e6), - "memory_limit": int(200e6), - "host_target": 0.0, - "host_spill": 0.0, - "host_pause": 0.0, - "spills_to_disk": True, - }, - { - "device_memory_limit": int(200e6), - "memory_limit": 0, - "host_target": 0.0, - "host_spill": 0.0, - "host_pause": 0.0, - "spills_to_disk": False, - }, - ], -) -def test_cupy_device_spill(params): - @gen_cluster( - client=True, - nthreads=[("127.0.0.1", 1)], - Worker=Worker, - timeout=60, - worker_kwargs={ - "memory_limit": params["memory_limit"], - "data": DeviceHostFile( - device_memory_limit=params["device_memory_limit"], - memory_limit=params["memory_limit"], - ), - }, - config={ - "distributed.comm.timeouts.connect": "20s", - "distributed.worker.memory.target": params["host_target"], - "distributed.worker.memory.spill": params["host_spill"], - "distributed.worker.memory.pause": params["host_pause"], - }, - ) - def test_device_spill(client, scheduler, worker): - cupy = pytest.importorskip("cupy") - rs = da.random.RandomState(RandomState=cupy.random.RandomState) - x = rs.random(int(50e6), chunks=2e6) - - xx = x.persist() - yield wait(xx) - - # Allow up to 1024 bytes overhead per chunk serialized - yield client.run(worker_assert, x.nbytes, 1024, 1024) - - y = client.compute(x.sum()) - res = yield y - - assert (abs(res / x.size) - 0.5) < 1e-3 - - yield client.run(worker_assert, x.nbytes, 1024, 1024) - host_chunks = yield client.run(lambda: len(get_worker().data.host)) - disk_chunks = yield client.run(lambda: len(get_worker().data.disk or list())) - for hc, dc in zip(host_chunks.values(), disk_chunks.values()): - if params["spills_to_disk"]: - assert dc > 0 - else: - assert hc > 0 - assert dc == 0 - - test_device_spill() - - @pytest.mark.parametrize( "params", [ @@ -234,95 +153,6 @@ async def test_cupy_cluster_device_spill(params): assert dc == 0 -@pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/79") -@pytest.mark.parametrize( - "params", - [ - { - "device_memory_limit": int(200e6), - "memory_limit": int(800e6), - "host_target": 0.0, - "host_spill": 0.0, - "host_pause": 0.0, - "spills_to_disk": False, - }, - { - "device_memory_limit": int(200e6), - "memory_limit": int(200e6), - "host_target": 0.0, - "host_spill": 0.0, - "host_pause": 0.0, - "spills_to_disk": True, - }, - { - "device_memory_limit": int(200e6), - "memory_limit": 0, - "host_target": 0.0, - "host_spill": 0.0, - "host_pause": 0.0, - "spills_to_disk": False, - }, - ], -) -def test_cudf_device_spill(params): - @gen_cluster( - client=True, - nthreads=[("127.0.0.1", 1)], - Worker=Worker, - timeout=60, - worker_kwargs={ - "memory_limit": params["memory_limit"], - "data": DeviceHostFile( - device_memory_limit=params["device_memory_limit"], - memory_limit=params["memory_limit"], - ), - }, - config={ - "distributed.comm.timeouts.connect": "20s", - "distributed.worker.memory.target": params["host_target"], - "distributed.worker.memory.spill": params["host_spill"], - "distributed.worker.memory.pause": params["host_pause"], - }, - ) - def test_device_spill(client, scheduler, worker): - cudf = pytest.importorskip("cudf") - # There's a known issue with datetime64: - # https://github.com/numpy/numpy/issues/4983#issuecomment-441332940 - # The same error above happens when spilling datetime64 to disk - cdf = ( - dask.datasets.timeseries(dtypes={"x": int, "y": float}, freq="100ms") - .reset_index(drop=True) - .map_partitions(cudf.from_pandas) - ) - - sizes = yield client.compute(cdf.map_partitions(lambda df: df.__sizeof__())) - sizes = sizes.tolist() - nbytes = sum(sizes) - part_index_nbytes = (yield client.compute(cdf.partitions[0].index)).__sizeof__() - - cdf2 = cdf.persist() - yield wait(cdf2) - - del cdf - - host_chunks = yield client.run(lambda: len(get_worker().data.host)) - disk_chunks = yield client.run(lambda: len(get_worker().data.disk or list())) - for hc, dc in zip(host_chunks.values(), disk_chunks.values()): - if params["spills_to_disk"]: - assert dc > 0 - else: - assert hc > 0 - assert dc == 0 - - yield client.run(worker_assert, nbytes, 32, 2048 + part_index_nbytes) - - del cdf2 - - yield client.run(delayed_worker_assert, 0, 0, 0) - - test_device_spill() - - @pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/79") @pytest.mark.parametrize( "params", @@ -358,7 +188,7 @@ async def test_cudf_cluster_device_spill(params): cudf = pytest.importorskip("cudf") with dask.config.set({"distributed.worker.memory.terminate": False}): async with LocalCUDACluster( - 1, + n_workers=1, device_memory_limit=params["device_memory_limit"], memory_limit=params["memory_limit"], memory_target_fraction=params["host_target"],