Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor gather_dep #6388

Merged
merged 14 commits into from
Jun 10, 2022
5 changes: 2 additions & 3 deletions distributed/tests/test_stories.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ async def test_worker_story_with_deps(c, s, a, b):
# Story now includes randomized stimulus_ids and timestamps.
story = b.story("res")
stimulus_ids = {ev[-2].rsplit("-", 1)[0] for ev in story}
assert stimulus_ids == {"compute-task", "task-finished"}

assert stimulus_ids == {"compute-task", "gather-dep-success", "task-finished"}
# This is a simple transition log
expected = [
("res", "compute-task", "released"),
Expand All @@ -153,7 +152,7 @@ async def test_worker_story_with_deps(c, s, a, b):

story = b.story("dep")
stimulus_ids = {ev[-2].rsplit("-", 1)[0] for ev in story}
assert stimulus_ids == {"compute-task"}
assert stimulus_ids == {"compute-task", "gather-dep-success"}
expected = [
("dep", "ensure-task-exists", "released"),
("dep", "released", "fetch", "fetch", {}),
Expand Down
82 changes: 82 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ExecuteSuccessEvent,
Instruction,
RecommendationsConflict,
RefreshWhoHasEvent,
ReleaseWorkerDataMsg,
RescheduleEvent,
RescheduleMsg,
Expand Down Expand Up @@ -603,3 +604,84 @@ async def test_missing_to_waiting(c, s, w1, w2, w3):
await w1.close()

await f1


@gen_cluster(client=True, nthreads=[("", 1)] * 3)
async def test_fetch_to_missing_on_refresh_who_has(c, s, w1, w2, w3):
"""
1. Two tasks, x and y, are only available on a busy worker.
The worker sends request-refresh-who-has to the scheduler.
2. The scheduler responds that x has become missing, while y has gained an
additional replica
3. The handler for RefreshWhoHasEvent empties x.who_has and recommends a transition
to missing.
5. Before the recommendation can be implemented, the same event invokes
_ensure_communicating to let y to transition to flight. This in turn pops x from
data_needed - but x has an empty who_has, which is an exceptional situation.
6. The transition fetch->missing is executed, but x is no longer in
data_needed - another exceptional situation.
"""
x = c.submit(inc, 1, key="x", workers=[w1.address])
y = c.submit(inc, 2, key="y", workers=[w1.address])
await wait([x, y])
w1.total_in_connections = 0
s.request_acquire_replicas(w3.address, ["x", "y"], stimulus_id="test1")

# The tasks will now flip-flop between fetch and flight every 150ms
# (see Worker.retry_busy_worker_later)
await wait_for_state("x", "fetch", w3)
await wait_for_state("y", "fetch", w3)
assert w1.address in w3.busy_workers
# w3 sent {op: request-refresh-who-has, keys: [x, y]}
# There also may have been enough time for a refresh-who-has message to come back,
# which reiterated what the w3 already knew:
# {op: refresh-who-has, who_has={x: [w1.address], y: [w1.address]}}

# Let's instead simulate that, while request-refresh-who-has was in transit,
# w2 gained a replica of y and then subsequently w1 closed down.
# When request-refresh-who-has lands, the scheduler will respond:
# {op: refresh-who-has, who_has={x: [], y: [w2.address]}}
w3.handle_stimulus(
RefreshWhoHasEvent(who_has={"x": {}, "y": {w2.address}}, stimulus_id="test3")
)
assert w3.tasks["x"].state == "missing"
assert w3.tasks["y"].state == "flight"
assert w3.tasks["y"].who_has == {w2.address}


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_fetch_to_missing_on_network_failure(c, s, a):
"""
1. Multiple tasks can be fetched from the same worker
2. Only some transition to flight, while the others remain in fetch state, e.g.
because of excessive size
3. gather_dep returns GatherDepNetworkFailureEvent
4. The event empties has_what. This impacts both the tasks in fetch as well as those
in flight. The event recommends a transition to missing for all tasks with empty
who_has.
5. Before the recommendation can be implemented, the same event invokes
_ensure_communicating, which pops the tasks in fetch state from data_needed - but
they have an empty who_has, which is an exceptional situation.
7. The transition fetch->missing is executed, but the tasks are no longer in
data_needed - another exceptional situation.
"""
block_get_data = asyncio.Event()

class BlockedBreakingWorker(Worker):
async def get_data(self, comm, *args, **kwargs):
await block_get_data.wait()
raise OSError("fake error")

async with BlockedBreakingWorker(s.address) as b:
x = c.submit(inc, 1, key="x", workers=[b.address])
y = c.submit(inc, 2, key="y", workers=[b.address])
await wait([x, y])
s.request_acquire_replicas(a.address, ["x"], stimulus_id="test_x")
await wait_for_state("x", "flight", a)
s.request_acquire_replicas(a.address, ["y"], stimulus_id="test_y")
await wait_for_state("y", "fetch", a)

block_get_data.set()

await wait_for_state("x", "missing", a)
await wait_for_state("y", "missing", a)