Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor gather_dep #6388

Merged
merged 14 commits into from
Jun 10, 2022
35 changes: 0 additions & 35 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3023,7 +3023,6 @@ def __init__(
"task-erred": self.handle_task_erred,
"release-worker-data": self.release_worker_data,
"add-keys": self.add_keys,
"missing-data": self.handle_missing_data,
"long-running": self.handle_long_running,
"reschedule": self.reschedule,
"keep-alive": lambda *args, **kwargs: None,
Expand Down Expand Up @@ -4667,40 +4666,6 @@ def handle_task_erred(self, key: str, stimulus_id: str, **msg) -> None:
self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
self.send_all(client_msgs, worker_msgs)

def handle_missing_data(
self, key: str, worker: str, errant_worker: str, stimulus_id: str
) -> None:
"""Signal that `errant_worker` does not hold `key`.

This may either indicate that `errant_worker` is dead or that we may be working
with stale data and need to remove `key` from the workers `has_what`. If no
replica of a task is available anymore, the task is transitioned back to
released and rescheduled, if possible.

Parameters
----------
key : str
Task key that could not be found
worker : str
Address of the worker informing the scheduler
errant_worker : str
Address of the worker supposed to hold a replica
"""
logger.debug(f"handle missing data {key=} {worker=} {errant_worker=}")
self.log_event(errant_worker, {"action": "missing-data", "key": key})

ts = self.tasks.get(key)
ws = self.workers.get(errant_worker)
if not ts or not ws or ws not in ts.who_has:
return

self.remove_replica(ts, ws)
if ts.state == "memory" and not ts.who_has:
if ts.run_spec:
self.transitions({key: "released"}, stimulus_id)
else:
self.transitions({key: "forgotten"}, stimulus_id)

def release_worker_data(self, key: str, worker: str, stimulus_id: str) -> None:
ts = self.tasks.get(key)
ws = self.workers.get(worker)
Expand Down
13 changes: 0 additions & 13 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,6 @@ def f(ev):
)


@gen_cluster(client=True)
async def test_worker_find_missing(c, s, a, b):
fut = c.submit(inc, 1, workers=[a.address])
await fut
# We do not want to use proper API since it would ensure that the cluster is
# informed properly
del a.data[fut.key]
del a.tasks[fut.key]

# Actually no worker has the data; the scheduler is supposed to reschedule
assert await c.submit(inc, fut, workers=[b.address]) == 3


@gen_cluster(client=True)
async def test_worker_stream_died_during_comm(c, s, a, b):
write_queue = asyncio.Queue()
Expand Down
5 changes: 2 additions & 3 deletions distributed/tests/test_stories.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ async def test_worker_story_with_deps(c, s, a, b):
# Story now includes randomized stimulus_ids and timestamps.
story = b.story("res")
stimulus_ids = {ev[-2].rsplit("-", 1)[0] for ev in story}
assert stimulus_ids == {"compute-task", "task-finished"}

assert stimulus_ids == {"compute-task", "gather-dep-success", "task-finished"}
# This is a simple transition log
expected = [
("res", "compute-task", "released"),
Expand All @@ -153,7 +152,7 @@ async def test_worker_story_with_deps(c, s, a, b):

story = b.story("dep")
stimulus_ids = {ev[-2].rsplit("-", 1)[0] for ev in story}
assert stimulus_ids == {"compute-task"}
assert stimulus_ids == {"compute-task", "gather-dep-success"}
expected = [
("dep", "ensure-task-exists", "released"),
("dep", "released", "fetch", "fetch", {}),
Expand Down
4 changes: 3 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2925,7 +2925,6 @@ async def test_who_has_consistent_remove_replicas(c, s, *workers):
coming_from.handle_stimulus(RemoveReplicasEvent(keys=[f1.key], stimulus_id="test"))
await f2

assert_story(a.story(f1.key), [(f1.key, "missing-dep")])
assert a.tasks[f1.key].suspicious_count == 0
assert s.tasks[f1.key].suspicious == 0

Expand Down Expand Up @@ -3129,6 +3128,7 @@ async def test_gather_dep_cancelled_rescheduled(c, s):
for in-flight state. The response parser, however, did not distinguish
resulting in unwanted missing-data signals to the scheduler, causing
potential rescheduling or data leaks.
(Note: missing-data was removed in #6445).

If a cancelled key is rescheduled for fetching while gather_dep waits
internally for get_data, the response parser would misclassify this key and
Expand Down Expand Up @@ -3177,6 +3177,8 @@ async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a)
for in-flight state. The response parser, however, did not distinguish
resulting in unwanted missing-data signals to the scheduler, causing
potential rescheduling or data leaks.
(Note: missing-data was removed in #6445).

This test may become obsolete if the implementation changes significantly.
"""
async with BlockedGatherDep(s.address) as b:
Expand Down
35 changes: 35 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,3 +647,38 @@ async def test_fetch_to_missing_on_refresh_who_has(c, s, w1, w2, w3):
assert w3.tasks["x"].state == "missing"
assert w3.tasks["y"].state == "flight"
assert w3.tasks["y"].who_has == {w2.address}


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_fetch_to_missing_on_network_failure(c, s, a):
"""
1. Two tasks, x and y, are respectively in flight and fetch state from the same
worker, which holds the only replica of both.
2. gather_dep for x returns GatherDepNetworkFailureEvent
3. The event empties has_what, x.who_has, and y.who_has.
4. The same event invokes _ensure_communicating, which pops y from data_needed
- but y has an empty who_has, which is an exceptional situation.
_ensure_communicating recommends a transition to missing for x.
5. The fetch->missing transition is executed, but y is no longer in data_needed -
another exceptional situation.
"""
block_get_data = asyncio.Event()

class BlockedBreakingWorker(Worker):
async def get_data(self, comm, *args, **kwargs):
await block_get_data.wait()
raise OSError("fake error")

async with BlockedBreakingWorker(s.address) as b:
x = c.submit(inc, 1, key="x", workers=[b.address])
y = c.submit(inc, 2, key="y", workers=[b.address])
await wait([x, y])
s.request_acquire_replicas(a.address, ["x"], stimulus_id="test_x")
await wait_for_state("x", "flight", a)
s.request_acquire_replicas(a.address, ["y"], stimulus_id="test_y")
await wait_for_state("y", "fetch", a)

block_get_data.set()

await wait_for_state("x", "missing", a)
await wait_for_state("y", "missing", a)