New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Paused workers shouldn't steal tasks #5665
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
from distributed import Lock, Nanny, Worker, wait, worker_client | ||
from distributed.compatibility import LINUX, WINDOWS | ||
from distributed.config import config | ||
from distributed.core import Status | ||
from distributed.metrics import time | ||
from distributed.scheduler import key_split | ||
from distributed.system import MEMORY_LIMIT | ||
|
@@ -816,28 +817,47 @@ async def test_steal_twice(c, s, a, b): | |
|
||
while len(s.tasks) < 100: # tasks are all allocated | ||
await asyncio.sleep(0.01) | ||
# Wait for b to start stealing tasks | ||
while len(b.tasks) < 30: | ||
await asyncio.sleep(0.01) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. without this, the title of the test, "steal twice", makes no sense! |
||
|
||
# Army of new workers arrives to help | ||
workers = await asyncio.gather(*(Worker(s.address, loop=s.loop) for _ in range(20))) | ||
workers = await asyncio.gather(*(Worker(s.address) for _ in range(20))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: the changes to stealing.py mean that these new workers will start stealing slightly later than before, since they now have to wait to reach running status. It remains to be seen if this causes flakiness. |
||
|
||
await wait(futures) | ||
|
||
has_what = dict(s.has_what) # take snapshot | ||
empty_workers = [w for w, keys in has_what.items() if not len(keys)] | ||
if len(empty_workers) > 2: | ||
pytest.fail( | ||
"Too many workers without keys (%d out of %d)" | ||
% (len(empty_workers), len(has_what)) | ||
) | ||
assert max(map(len, has_what.values())) < 30 | ||
# Note: this includes a and b | ||
empty_workers = [w for w, keys in s.has_what.items() if not keys] | ||
assert ( | ||
len(empty_workers) < 3 | ||
), f"Too many workers without keys ({len(empty_workers)} out of {len(s.workers)})" | ||
# This also tests that some tasks were stolen from b | ||
# (see `while len(b.tasks) < 30` above) | ||
assert max(map(len, s.has_what.values())) < 30 | ||
|
||
assert a.in_flight_tasks == 0 | ||
assert b.in_flight_tasks == 0 | ||
|
||
await c._close() | ||
await asyncio.gather(*(w.close() for w in workers)) | ||
|
||
|
||
@gen_cluster(client=True, nthreads=[("", 1)] * 3) | ||
async def test_paused_workers_must_not_steal(c, s, w1, w2, w3): | ||
w2.memory_pause_fraction = 1e-15 | ||
while s.workers[w2.address].status != Status.paused: | ||
await asyncio.sleep(0.01) | ||
|
||
x = c.submit(inc, 1, workers=w1.address) | ||
await wait(x) | ||
|
||
futures = [c.submit(slowadd, x, i, delay=0.1) for i in range(10)] | ||
await wait(futures) | ||
|
||
assert w1.data | ||
assert not w2.data | ||
assert w3.data | ||
|
||
|
||
@gen_cluster(client=True) | ||
async def test_dont_steal_already_released(c, s, a, b): | ||
future = c.submit(slowinc, 1, delay=0.05, workers=a.address) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes the big-o from O(1) to O(n) of the number of workers, for tasks without restrictions, and only if there are some saturated workers (otherwise the time of topk below, which is O(n), dominates).
I'm unconcerned about this - even with 1k~10k workers, this method runs every 100ms so this should be inconsequential.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention here is to filter out paused workers. I'm wondering why a paused worker would be classified as idle in the first place. Is paused and idle not counterintuitive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An idle worker is a worker that isn't processing any tasks.
A paused worker is a worker that isn't going to process any new tasks beyond the ones currently running (not queued) on it.
According to this definition, a paused worker may or may not be idle.
We could, of course, change the definition of idle to "not processing any tasks AND a prime candidate for new tasks", but I feel that would be counter-intuitive.