Skip to content

Commit

Permalink
Fix flaky test_close_gracefully and test_lifetime (#5677)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jan 21, 2022
1 parent af84e40 commit 7d4e6ee
Showing 1 changed file with 52 additions and 16 deletions.
68 changes: 52 additions & 16 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1627,32 +1627,68 @@ async def test_worker_listens_on_same_interface_by_default(cleanup, Worker):

@gen_cluster(client=True)
async def test_close_gracefully(c, s, a, b):
futures = c.map(slowinc, range(200), delay=0.1)
futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address])

while not b.data:
# Note: keys will appear in b.data several milliseconds before they switch to
# status=memory in s.tasks. It's important to sample the in-memory keys from the
# scheduler side, because those that the scheduler thinks are still processing won't
# be replicated by retire_workers().
while True:
mem = {k for k, ts in s.tasks.items() if ts.state == "memory"}
if len(mem) >= 8:
break
await asyncio.sleep(0.01)
mem = set(b.data)
proc = {ts for ts in b.tasks.values() if ts.state == "executing"}
assert proc

assert any(ts for ts in b.tasks.values() if ts.state == "executing")

await b.close_gracefully()

assert b.status == Status.closed
assert b.address not in s.workers
assert mem.issubset(a.data.keys())
for ts in proc:
assert ts.state in ("executing", "memory")

# All tasks that were in memory in b have been copied over to a;
# they have not been recomputed
for key in mem:
assert_worker_story(
a.story(key),
[
(key, "put-in-memory"),
(key, "receive-from-scatter"),
],
strict=True,
)
assert key in a.data


@pytest.mark.slow
@gen_cluster(client=True, nthreads=[])
async def test_lifetime(c, s):
async with Worker(s.address) as a, Worker(s.address, lifetime="1 seconds") as b:
futures = c.map(slowinc, range(200), delay=0.1, worker=[b.address])
await asyncio.sleep(1.5)
assert b.status not in (Status.running, Status.paused)
await b.finished()
assert set(b.data) == set(a.data) # successfully moved data over
@gen_cluster(client=True, nthreads=[("", 1)], timeout=10)
async def test_lifetime(c, s, a):
async with Worker(s.address, lifetime="2 seconds") as b:
futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address])
await asyncio.sleep(1)
assert not a.data
# Note: keys will appear in b.data several milliseconds before they switch to
# status=memory in s.tasks. It's important to sample the in-memory keys from the
# scheduler side, because those that the scheduler thinks are still processing
# won't be replicated by retire_workers().
mem = {k for k, ts in s.tasks.items() if ts.state == "memory"}
assert mem

while b.status != Status.closed:
await asyncio.sleep(0.01)

# All tasks that were in memory in b have been copied over to a;
# they have not been recomputed
for key in mem:
assert_worker_story(
a.story(key),
[
(key, "put-in-memory"),
(key, "receive-from-scatter"),
],
strict=True,
)
assert key in a.data


@gen_cluster(worker_kwargs={"lifetime": "10s", "lifetime_stagger": "2s"})
Expand Down

0 comments on commit 7d4e6ee

Please sign in to comment.