Skip to content

Commit

Permalink
Code review
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jun 9, 2022
1 parent 039db5f commit 10031c8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 58 deletions.
1 change: 0 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2925,7 +2925,6 @@ async def test_who_has_consistent_remove_replicas(c, s, *workers):
coming_from.handle_stimulus(RemoveReplicasEvent(keys=[f1.key], stimulus_id="test"))
await f2

assert_story(a.story(f1.key), [(f1.key, "missing-dep")])
assert a.tasks[f1.key].suspicious_count == 0
assert s.tasks[f1.key].suspicious == 0

Expand Down
11 changes: 5 additions & 6 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,12 +655,11 @@ async def test_fetch_to_missing_on_network_failure(c, s, a):
1. Two tasks, x and y, are respectively in flight and fetch state from the same
worker, which holds the only replica of both.
2. gather_dep for x returns GatherDepNetworkFailureEvent
3. The event empties has_what, x.who_has, and y.who_has; it recommends a transition
to missing for both x and y.
5. Before the recommendation can be implemented, the same event invokes
_ensure_communicating, which pops y from data_needed - but y has an empty
who_has, which is an exceptional situation.
6. The fetch->missing transition is executed, but y is no longer in data_needed -
3. The event empties has_what, x.who_has, and y.who_has.
4. The same event invokes _ensure_communicating, which pops y from data_needed
- but y has an empty who_has, which is an exceptional situation.
_ensure_communicating recommends a transition to missing for x.
5. The fetch->missing transition is executed, but y is no longer in data_needed -
another exceptional situation.
"""
block_get_data = asyncio.Event()
Expand Down
72 changes: 21 additions & 51 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2187,13 +2187,7 @@ def transition_fetch_flight(
def transition_fetch_missing(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
# There's a use case where ts won't be found in self.data_needed, so
# `self.data_needed.remove(ts)` would crash:
# 1. An event handler empties who_has and pushes a recommendation to missing
# 2. The same event handler calls _ensure_communicating, which pops the task
# from data_needed
# 3. The recommendation is enacted
# See matching code in _ensure_communicating.
# _ensure_communicating could have just popped this task out of data_needed
self.data_needed.discard(ts)
return self.transition_generic_missing(ts, stimulus_id=stimulus_id)

Expand Down Expand Up @@ -3019,11 +3013,7 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs:
assert self.address not in ts.who_has

if not ts.who_has:
# An event handler just emptied who_has and recommended a fetch->missing
# transition. Then, the same handler called _ensure_communicating. The
# transition hasn't been enacted yet, so the task is still in fetch
# state and in data_needed.
# See matching code in transition_fetch_missing.
recommendations[ts] = "missing"

This comment has been minimized.

Copy link
@crusaderky

crusaderky Jun 9, 2022

Author Collaborator

This isn't a "just in case". GatherDepNetowkrFailureEvent does not recommend transitioning from fetch to missing anymore.

continue

workers = [
Expand Down Expand Up @@ -3369,48 +3359,26 @@ def _gather_dep_done_common(self, ev: GatherDepDoneEvent) -> Iterator[TaskState]
ts.done = True
yield ts

def _refetch_missing_data(
self, worker: str, tasks: Iterable[TaskState], stimulus_id: str
) -> RecsInstrs:
"""Helper of GatherDepSuccessEvent and GatherDepNetworkFailureEvent handlers.
Remove tasks that were not returned from the peer worker from has_what and
inform the scheduler with a missing-data message. Then, transition them back to
'fetch' so that they can be fetched from another worker.
"""
recommendations: Recs = {}
instructions: Instructions = []

for ts in tasks:
ts.who_has.discard(worker)
self.has_what[worker].discard(ts.key)
self.log.append((ts.key, "missing-dep", stimulus_id, time()))
if ts.state in ("flight", "resumed", "cancelled"):
# This will actually transition to missing if who_has is empty
recommendations[ts] = "fetch"
elif ts.state == "fetch":
self.data_needed_per_worker[worker].discard(ts)
if not ts.who_has:
recommendations[ts] = "missing"

return recommendations, instructions

@_handle_event.register
def _handle_gather_dep_success(self, ev: GatherDepSuccessEvent) -> RecsInstrs:
"""gather_dep terminated successfully.
The response may contain less keys than the request.
"""
recommendations: Recs = {}
refetch = set()
for ts in self._gather_dep_done_common(ev):
if ts.key in ev.data:
recommendations[ts] = ("memory", ev.data[ts.key])
else:
refetch.add(ts)
self.log.append((ts.key, "missing-dep", ev.stimulus_id, time()))
if self.validate:
assert ts.state != "fetch"
assert ts not in self.data_needed_per_worker[ev.worker]

This comment has been minimized.

Copy link
@crusaderky

crusaderky Jun 9, 2022

Author Collaborator

Otherwise _select_keys_for_gather will fail later

ts.who_has.discard(ev.worker)
self.has_what[ev.worker].discard(ts.key)
recommendations[ts] = "fetch"

return merge_recs_instructions(
(recommendations, []),
self._refetch_missing_data(ev.worker, refetch, ev.stimulus_id),
self._ensure_communicating(stimulus_id=ev.stimulus_id),
)

Expand Down Expand Up @@ -3461,17 +3429,20 @@ def _handle_gather_dep_network_failure(
either retry a different worker, or ask the scheduler to inform us of a new
worker if no other worker is available.
"""
refetch = set(self._gather_dep_done_common(ev))
refetch |= {self.tasks[key] for key in self.has_what[ev.worker]}
self.data_needed_per_worker.pop(ev.worker)
for key in self.has_what.pop(ev.worker):
ts = self.tasks[key]
ts.who_has.discard(ev.worker)

recs, instrs = merge_recs_instructions(
self._refetch_missing_data(ev.worker, refetch, ev.stimulus_id),
recommendations: Recs = {}
for ts in self._gather_dep_done_common(ev):
self.log.append((ts.key, "missing-dep", ev.stimulus_id, time()))
recommendations[ts] = "fetch"

return merge_recs_instructions(
(recommendations, []),
self._ensure_communicating(stimulus_id=ev.stimulus_id),
)
# This cleanup must happen after _refetch_missing_data
del self.has_what[ev.worker]
del self.data_needed_per_worker[ev.worker]
return recs, instrs

@_handle_event.register
def _handle_gather_dep_failure(self, ev: GatherDepFailureEvent) -> RecsInstrs:
Expand Down Expand Up @@ -4266,8 +4237,7 @@ def validate_task_fetch(self, ts):
assert self.address not in ts.who_has
assert not ts.done
assert ts in self.data_needed

This comment has been minimized.

Copy link
@crusaderky

crusaderky Jun 9, 2022

Author Collaborator

This assertion is still valid, as the validate_task_state is only called at the end of a transitions cycle. It's only in transition_fetch_missing, which is in the middle of a cycle, that it is not.

assert ts.who_has

# Note: ts.who_has may be empty; see GatherDepNetworkFailureEvent
for w in ts.who_has:
assert ts.key in self.has_what[w]
assert ts in self.data_needed_per_worker[w]
Expand Down

0 comments on commit 10031c8

Please sign in to comment.