Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make P2P more configurable #8469

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions distributed/config.py
Expand Up @@ -57,6 +57,7 @@
"distributed.scheduler.events-log-length": "distributed.admin.low-level-log-length",
"recent-messages-log-length": "distributed.admin.low-level-log-length",
"distributed.comm.recent-messages-log-length": "distributed.admin.low-level-log-length",
"distributed.p2p.disk": "distributed.p2p.storage.disk",
}

# Affects yaml and env variables configs, as well as calls to dask.config.set()
Expand Down
48 changes: 44 additions & 4 deletions distributed/distributed-schema.yaml
Expand Up @@ -1034,6 +1034,28 @@ properties:
description: Configuration settings for Dask communications specific to P2P
properties:

buffer:
type:
- string
- integer
description: |
The maximum amount of data for P2P's comm buffers to buffer in-memory per worker.
This limit is not absolute but used to apply back pressure.
concurrency:
type: integer
description: Number of concurrent background tasks used for IO-intensive operations in per P2P comm buffer.
message-bytes-limit:
type:
- string
- integer
description: |
The maximum amount of data for P2P to send to another worker in a single operation

Data is sent in batches, and if the first shard is larger than this value,
the task shard still be sent to ensure progress. Hence, this limit is not absolute.
Note that this limit applies to a single send operation and a worker may send data to
multiple workers in parallel.

retry:
type: object
description: |
Expand All @@ -1055,10 +1077,28 @@ properties:
max:
type: string
description: The maximum delay between retries
disk:
type: boolean
description: |
Whether or not P2P stores intermediate data on disk instead of memory

storage:
type: object
description: Configuration settings for P2P storage
properties:

buffer:
type:
- string
- integer
description: |
The maximum amount of data for P2P's disk buffers to buffer in-memory per worker
This limit is not absolute but used to apply back pressure.
disk:
type: boolean
description: |
Whether or not P2P stores intermediate data on disk instead of memory
threads:
type:
- integer
- "null"
description: Number of threads used for CPU-intensive operations per worker. Defaults to number of worker threads.

dashboard:
type: object
Expand Down
8 changes: 7 additions & 1 deletion distributed/distributed.yaml
Expand Up @@ -299,12 +299,18 @@ distributed:

p2p:
comm:
buffer: 1 GiB
concurrency: 10
message-bytes-limit: 2 MiB
retry:
count: 10
delay:
min: 1s # the first non-zero delay between re-tries
max: 30s # the maximum delay between re-tries
disk: True
storage:
buffer: 100 MiB
disk: True
threads: null

###################
# Bokeh dashboard #
Expand Down
1 change: 1 addition & 0 deletions distributed/shuffle/_buffer.py
Expand Up @@ -150,6 +150,7 @@ def _continue() -> bool:
if self.max_message_size > 0:
size = 0
shards = []
# FIXME: We always exceed the limit, not just on the first shard.
while size < self.max_message_size:
try:
shard = self.shards[part_id].pop()
Expand Down
9 changes: 3 additions & 6 deletions distributed/shuffle/_comms.py
Expand Up @@ -3,8 +3,6 @@
from collections.abc import Awaitable, Callable
from typing import Any

from dask.utils import parse_bytes

from distributed.metrics import context_meter
from distributed.shuffle._disk import ShardsBuffer
from distributed.shuffle._limiter import ResourceLimiter
Expand Down Expand Up @@ -49,18 +47,17 @@ class CommShardsBuffer(ShardsBuffer):
Number of background tasks to run.
"""

max_message_size = parse_bytes("2 MiB")

def __init__(
self,
send: Callable[[str, list[tuple[Any, Any]]], Awaitable[None]],
memory_limiter: ResourceLimiter,
concurrency_limit: int = 10,
max_message_size: int,
concurrency_limit: int,
):
super().__init__(
memory_limiter=memory_limiter,
concurrency_limit=concurrency_limit,
max_message_size=CommShardsBuffer.max_message_size,
max_message_size=max_message_size,
)
self.send = send

Expand Down
11 changes: 9 additions & 2 deletions distributed/shuffle/_core.py
Expand Up @@ -19,7 +19,7 @@
import dask.config
from dask.core import flatten
from dask.typing import Key
from dask.utils import parse_timedelta
from dask.utils import parse_bytes, parse_timedelta

from distributed.core import PooledRPCCall
from distributed.exceptions import Reschedule
Expand Down Expand Up @@ -115,8 +115,15 @@
self._disk_buffer = MemoryShardsBuffer(deserialize=self.deserialize)

with self._capture_metrics("background-comms"):
max_message_size = parse_bytes(

Check warning on line 118 in distributed/shuffle/_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_core.py#L118

Added line #L118 was not covered by tests
dask.config.get("distributed.p2p.comm.message-bytes-limit")
)
concurrency_limit = dask.config.get("distributed.p2p.comm.concurrency")

Check warning on line 121 in distributed/shuffle/_core.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_core.py#L121

Added line #L121 was not covered by tests
self._comm_buffer = CommShardsBuffer(
send=self.send, memory_limiter=memory_limiter_comms
send=self.send,
max_message_size=max_message_size,
memory_limiter=memory_limiter_comms,
concurrency_limit=concurrency_limit,
)

# TODO: reduce number of connections to number of workers
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/_rechunk.py
Expand Up @@ -393,7 +393,7 @@
_barrier_key = barrier_key(ShuffleId(partial_token))
slice_name = f"rechunk-slice-{partial_token}"
transfer_name = f"rechunk-transfer-{partial_token}"
disk: bool = dask.config.get("distributed.p2p.disk")
disk: bool = dask.config.get("distributed.p2p.storage.disk")

Check warning on line 396 in distributed/shuffle/_rechunk.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_rechunk.py#L396

Added line #L396 was not covered by tests

ndim = len(x.shape)

Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/_shuffle.py
Expand Up @@ -131,7 +131,7 @@
)

name = f"shuffle_p2p-{token}"
disk: bool = dask.config.get("distributed.p2p.disk")
disk: bool = dask.config.get("distributed.p2p.storage.disk")

Check warning on line 134 in distributed/shuffle/_shuffle.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_shuffle.py#L134

Added line #L134 was not covered by tests

layer = P2PShuffleLayer(
name,
Expand Down
12 changes: 9 additions & 3 deletions distributed/shuffle/_worker_plugin.py
Expand Up @@ -6,6 +6,7 @@
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any, overload

import dask
from dask.context import thread_state
from dask.typing import Key
from dask.utils import parse_bytes
Expand Down Expand Up @@ -279,14 +280,19 @@ def setup(self, worker: Worker) -> None:
# Initialize
self.worker = worker
self.shuffle_runs = _ShuffleRunManager(self)
comm_limit = parse_bytes(dask.config.get("distributed.p2p.comm.buffer"))
self.memory_limiter_comms = ResourceLimiter(
parse_bytes("100 MiB"), metrics_label="p2p-comms-limiter"
comm_limit, metrics_label="p2p-comms-limiter"
)
storage_limit = parse_bytes(dask.config.get("distributed.p2p.storage.buffer"))
self.memory_limiter_disk = ResourceLimiter(
parse_bytes("1 GiB"), metrics_label="p2p-disk-limiter"
storage_limit, metrics_label="p2p-disk-limiter"
)
self.closed = False
self._executor = ThreadPoolExecutor(self.worker.state.nthreads)
nthreads = (
dask.config.get("distributed.p2p.threads") or self.worker.state.nthreads
)
self._executor = ThreadPoolExecutor(nthreads)

def __str__(self) -> str:
return f"ShuffleWorkerPlugin on {self.worker.address}"
Expand Down
25 changes: 20 additions & 5 deletions distributed/shuffle/tests/test_comm_buffer.py
Expand Up @@ -20,7 +20,11 @@ async def test_basic(tmp_path):
async def send(address, shards):
d[address].extend(shards)

mc = CommShardsBuffer(send=send, memory_limiter=ResourceLimiter(None))
mc = CommShardsBuffer(
send=send,
max_message_size=parse_bytes("2 MiB"),
memory_limiter=ResourceLimiter(None),
)
await mc.write({"x": b"0" * 1000, "y": b"1" * 500})
await mc.write({"x": b"0" * 1000, "y": b"1" * 500})

Expand All @@ -37,7 +41,11 @@ async def test_exceptions(tmp_path):
async def send(address, shards):
raise Exception(123)

mc = CommShardsBuffer(send=send, memory_limiter=ResourceLimiter(None))
mc = CommShardsBuffer(
send=send,
max_message_size=parse_bytes("2 MiB"),
memory_limiter=ResourceLimiter(None),
)
await mc.write({"x": b"0" * 1000, "y": b"1" * 500})

while not mc._exception:
Expand All @@ -64,7 +72,10 @@ async def send(address, shards):
sending_first.set()

mc = CommShardsBuffer(
send=send, concurrency_limit=1, memory_limiter=ResourceLimiter(None)
send=send,
max_message_size=parse_bytes("2 MiB"),
concurrency_limit=1,
memory_limiter=ResourceLimiter(None),
)
await mc.write({"x": b"0", "y": b"1"})
await mc.write({"x": b"0", "y": b"1"})
Expand Down Expand Up @@ -95,7 +106,9 @@ async def send(address, shards):
nshards = 10
nputs = 20
comm_buffer = CommShardsBuffer(
send=send, memory_limiter=ResourceLimiter(parse_bytes("100 MiB"))
send=send,
max_message_size=parse_bytes("2 MiB"),
memory_limiter=ResourceLimiter(parse_bytes("100 MiB")),
)
payload = {
x: gen_bytes(frac, comm_buffer.memory_limiter.limit) for x in range(nshards)
Expand Down Expand Up @@ -136,7 +149,9 @@ async def send(address, shards):
nshards = 10
nputs = 20
comm_buffer = CommShardsBuffer(
send=send, memory_limiter=ResourceLimiter(parse_bytes("100 MiB"))
send=send,
max_message_size=parse_bytes("2 MiB"),
memory_limiter=ResourceLimiter(parse_bytes("100 MiB")),
)
payload = {
x: gen_bytes(frac, comm_buffer.memory_limiter.limit) for x in range(nshards)
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/tests/test_merge.py
Expand Up @@ -180,7 +180,7 @@ async def test_merge(c, s, a, b, how, disk):
b = dd.repartition(B, [0, 2, 5])

with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.disk": disk}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
Expand Down
6 changes: 3 additions & 3 deletions distributed/shuffle/tests/test_rechunk.py
Expand Up @@ -274,7 +274,7 @@ async def test_rechunk_2d(c, s, *ws, disk):
a = np.random.default_rng().uniform(0, 1, 300).reshape((10, 30))
x = da.from_array(a, chunks=((1, 2, 3, 4), (5,) * 6))
new = ((5, 5), (15,) * 2)
with dask.config.set({"distributed.p2p.disk": disk}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
x2 = rechunk(x, chunks=new, method="p2p")
assert x2.chunks == new
assert np.all(await c.compute(x2) == a)
Expand All @@ -293,7 +293,7 @@ async def test_rechunk_4d(c, s, *ws, disk):
a = np.random.default_rng().uniform(0, 1, 10000).reshape((10,) * 4)
x = da.from_array(a, chunks=old)
new = ((10,),) * 4
with dask.config.set({"distributed.p2p.disk": disk}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
x2 = rechunk(x, chunks=new, method="p2p")
assert x2.chunks == new
await c.compute(x2)
Expand Down Expand Up @@ -1200,7 +1200,7 @@ async def test_preserve_writeable_flag(c, s, a, b):
assert out.tolist() == [True, True]


@gen_cluster(client=True, config={"distributed.p2p.disk": False})
@gen_cluster(client=True, config={"distributed.p2p.storage.disk": False})
async def test_rechunk_in_memory_shards_dont_share_buffer(c, s, a, b):
"""Test that, if two shards are sent in the same RPC call and they contribute to
different output chunks, downstream tasks don't need to consume all output chunks in
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/tests/test_shuffle.py
Expand Up @@ -195,7 +195,7 @@ async def test_basic_integration(c, s, a, b, npartitions, disk):
freq="10 s",
)
with dask.config.set(
{"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk}
{"dataframe.shuffle.method": "p2p", "distributed.p2p.storage.disk": disk}
):
shuffled = dd.shuffle.shuffle(df, "x", npartitions=npartitions)
if npartitions is None:
Expand Down