Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove gen_cluster spill tests #758

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
172 changes: 1 addition & 171 deletions dask_cuda/tests/test_spill.py
Expand Up @@ -10,10 +10,8 @@
from distributed.metrics import time
from distributed.sizeof import sizeof
from distributed.utils_test import gen_cluster, gen_test, loop # noqa: F401
from distributed.worker import Worker

from dask_cuda import LocalCUDACluster, utils
from dask_cuda.device_host_file import DeviceHostFile

if utils.get_device_total_memory() < 1e10:
pytest.skip("Not enough GPU memory", allow_module_level=True)
Expand Down Expand Up @@ -80,85 +78,6 @@ def delayed_worker_assert(total_size, device_chunk_overhead, serialized_chunk_ov
)


# @pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/79")
@pytest.mark.parametrize(
"params",
[
{
"device_memory_limit": int(200e6),
"memory_limit": int(800e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": 0.0,
"spills_to_disk": False,
},
{
"device_memory_limit": int(200e6),
"memory_limit": int(200e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": 0.0,
"spills_to_disk": True,
},
{
"device_memory_limit": int(200e6),
"memory_limit": 0,
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": 0.0,
"spills_to_disk": False,
},
],
)
def test_cupy_device_spill(params):
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)],
Worker=Worker,
timeout=60,
worker_kwargs={
"memory_limit": params["memory_limit"],
"data": DeviceHostFile(
device_memory_limit=params["device_memory_limit"],
memory_limit=params["memory_limit"],
),
},
config={
"distributed.comm.timeouts.connect": "20s",
"distributed.worker.memory.target": params["host_target"],
"distributed.worker.memory.spill": params["host_spill"],
"distributed.worker.memory.pause": params["host_pause"],
},
)
def test_device_spill(client, scheduler, worker):
cupy = pytest.importorskip("cupy")
rs = da.random.RandomState(RandomState=cupy.random.RandomState)
x = rs.random(int(50e6), chunks=2e6)

xx = x.persist()
yield wait(xx)

# Allow up to 1024 bytes overhead per chunk serialized
yield client.run(worker_assert, x.nbytes, 1024, 1024)

y = client.compute(x.sum())
res = yield y

assert (abs(res / x.size) - 0.5) < 1e-3

yield client.run(worker_assert, x.nbytes, 1024, 1024)
host_chunks = yield client.run(lambda: len(get_worker().data.host))
disk_chunks = yield client.run(lambda: len(get_worker().data.disk or list()))
for hc, dc in zip(host_chunks.values(), disk_chunks.values()):
if params["spills_to_disk"]:
assert dc > 0
else:
assert hc > 0
assert dc == 0

test_device_spill()


@pytest.mark.parametrize(
"params",
[
Expand Down Expand Up @@ -234,95 +153,6 @@ async def test_cupy_cluster_device_spill(params):
assert dc == 0


@pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/79")
@pytest.mark.parametrize(
"params",
[
{
"device_memory_limit": int(200e6),
"memory_limit": int(800e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": 0.0,
"spills_to_disk": False,
},
{
"device_memory_limit": int(200e6),
"memory_limit": int(200e6),
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": 0.0,
"spills_to_disk": True,
},
{
"device_memory_limit": int(200e6),
"memory_limit": 0,
"host_target": 0.0,
"host_spill": 0.0,
"host_pause": 0.0,
"spills_to_disk": False,
},
],
)
def test_cudf_device_spill(params):
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)],
Worker=Worker,
timeout=60,
worker_kwargs={
"memory_limit": params["memory_limit"],
"data": DeviceHostFile(
device_memory_limit=params["device_memory_limit"],
memory_limit=params["memory_limit"],
),
},
config={
"distributed.comm.timeouts.connect": "20s",
"distributed.worker.memory.target": params["host_target"],
"distributed.worker.memory.spill": params["host_spill"],
"distributed.worker.memory.pause": params["host_pause"],
},
)
def test_device_spill(client, scheduler, worker):
cudf = pytest.importorskip("cudf")
# There's a known issue with datetime64:
# https://github.com/numpy/numpy/issues/4983#issuecomment-441332940
# The same error above happens when spilling datetime64 to disk
cdf = (
dask.datasets.timeseries(dtypes={"x": int, "y": float}, freq="100ms")
.reset_index(drop=True)
.map_partitions(cudf.from_pandas)
)

sizes = yield client.compute(cdf.map_partitions(lambda df: df.__sizeof__()))
sizes = sizes.tolist()
nbytes = sum(sizes)
part_index_nbytes = (yield client.compute(cdf.partitions[0].index)).__sizeof__()

cdf2 = cdf.persist()
yield wait(cdf2)

del cdf

host_chunks = yield client.run(lambda: len(get_worker().data.host))
disk_chunks = yield client.run(lambda: len(get_worker().data.disk or list()))
for hc, dc in zip(host_chunks.values(), disk_chunks.values()):
if params["spills_to_disk"]:
assert dc > 0
else:
assert hc > 0
assert dc == 0

yield client.run(worker_assert, nbytes, 32, 2048 + part_index_nbytes)

del cdf2

yield client.run(delayed_worker_assert, 0, 0, 0)

test_device_spill()


@pytest.mark.xfail(reason="https://github.com/rapidsai/dask-cuda/issues/79")
@pytest.mark.parametrize(
"params",
Expand Down Expand Up @@ -358,7 +188,7 @@ async def test_cudf_cluster_device_spill(params):
cudf = pytest.importorskip("cudf")
with dask.config.set({"distributed.worker.memory.terminate": False}):
async with LocalCUDACluster(
1,
n_workers=1,
device_memory_limit=params["device_memory_limit"],
memory_limit=params["memory_limit"],
memory_target_fraction=params["host_target"],
Expand Down