Skip to content

Commit

Permalink
Unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Oct 18, 2021
1 parent 9b04db9 commit fdb5e4c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 7 deletions.
26 changes: 26 additions & 0 deletions distributed/tests/test_scheduler.py
Expand Up @@ -3218,3 +3218,29 @@ async def test_set_restrictions(c, s, a, b):
assert s.tasks[f.key].worker_restrictions == {a.address}
s.reschedule(f)
await f


@gen_cluster(
client=True,
nthreads=[("", 1, {}), ("", 1, {"memory_pause_fraction": 1e-15}), ("", 1, {})],
)
async def test_avoid_paused_workers(c, s, w1, w2, w3):
while s.workers[w2.address].status != Status.paused:
await asyncio.sleep(0.01)
futures = c.map(slowinc, range(8), delay=0.1)
while (len(w1.tasks), len(w2.tasks), len(w3.tasks)) != (4, 0, 4):
await asyncio.sleep(0.01)


@gen_cluster(client=True, nthreads=[("", 1, {"memory_pause_fraction": 1e-15})])
async def test_unpause_schedules_unrannable_tasks(c, s, a):
while s.workers[a.address].status != Status.paused:
await asyncio.sleep(0.01)

fut = c.submit(inc, 1, key="x")
while not s.unrunnable:
await asyncio.sleep(.001)
assert next(iter(s.unrunnable)).key == "x"

a.memory_pause_fraction = 0.8
assert await fut == 2
4 changes: 2 additions & 2 deletions distributed/utils_test.py
Expand Up @@ -69,7 +69,7 @@
sync,
thread_state,
)
from .worker import Worker
from .worker import RUNNING, Worker

try:
import dask.array # register config
Expand Down Expand Up @@ -1549,7 +1549,7 @@ def check_instances():
for w in Worker._instances:
with suppress(RuntimeError): # closed IOLoop
w.loop.add_callback(w.close, report=False, executor_wait=False)
if w.status in (Status.running, Status.paused):
if w.status in RUNNING:
w.loop.add_callback(w.close)
Worker._instances.clear()

Expand Down
6 changes: 1 addition & 5 deletions distributed/worker.py
Expand Up @@ -3673,11 +3673,7 @@ def get_worker() -> Worker:
return thread_state.execution_state["worker"]
except AttributeError:
try:
return first(
w
for w in Worker._instances
if w.status in (Status.running, Status.paused)
)
return first(w for w in Worker._instances if w.status in RUNNING)
except StopIteration:
raise ValueError("No workers found")

Expand Down

0 comments on commit fdb5e4c

Please sign in to comment.