Skip to content

Commit

Permalink
Refactor release key (#5507)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Nov 18, 2021
1 parent e955b8a commit 1721d62
Showing 1 changed file with 30 additions and 25 deletions.
55 changes: 30 additions & 25 deletions distributed/worker.py
Expand Up @@ -652,7 +652,7 @@ def __init__(
("resumed", "waiting"): self.transition_rescheduled_next,
("resumed", "fetch"): self.transition_rescheduled_next,
("constrained", "executing"): self.transition_constrained_executing,
("constrained", "released"): self.transition_constrained_released,
("constrained", "released"): self.transition_generic_released,
("error", "released"): self.transition_generic_released,
("executing", "error"): self.transition_executing_error,
("executing", "long-running"): self.transition_executing_long_running,
Expand Down Expand Up @@ -1811,7 +1811,7 @@ def handle_free_keys(self, comm=None, keys=None, stimulus_id=None):
for key in keys:
ts = self.tasks.get(key)
if ts:
recommendations[ts] = "released" if ts.dependents else "forgotten"
recommendations[ts] = "released"

self.transitions(recommendations, stimulus_id=stimulus_id)

Expand Down Expand Up @@ -1844,7 +1844,7 @@ def handle_remove_replicas(self, keys, stimulus_id):
continue
if not ts.is_protected():
self.log.append((ts.key, "remove-replica-confirmed", stimulus_id))
recommendations[ts] = "released" if ts.dependents else "forgotten"
recommendations[ts] = "released"
else:
rejected.append(key)

Expand Down Expand Up @@ -2023,9 +2023,11 @@ def transition_missing_fetch(self, ts, *, stimulus_id):

def transition_missing_released(self, ts, *, stimulus_id):
self._missing_dep_flight.discard(ts)
recommendations = self.release_key(ts.key, reason="missing->released")
recommendations, smsgs = self.transition_generic_released(
ts, stimulus_id=stimulus_id
)
assert ts.key in self.tasks
return recommendations, []
return recommendations, smsgs

def transition_fetch_missing(self, ts, *, stimulus_id):
# handle_missing will append to self.data_needed if new workers are found
Expand All @@ -2042,7 +2044,18 @@ def transition_released_fetch(self, ts, *, stimulus_id):
return {}, []

def transition_generic_released(self, ts, *, stimulus_id):
recs = self.release_key(ts.key, reason=stimulus_id)
self.release_key(ts.key, reason=stimulus_id)
recs = {}
for dependency in ts.dependencies:
if (
not dependency.waiters
and dependency.state not in READY | PROCESSING | {"memory"}
):
recs[dependency] = "released"

if not ts.dependents:
recs[ts] = "forgotten"

return recs, []

def transition_released_waiting(self, ts, *, stimulus_id):
Expand Down Expand Up @@ -2082,8 +2095,8 @@ def transition_fetch_flight(self, ts, worker, *, stimulus_id):
return {}, []

def transition_memory_released(self, ts, *, stimulus_id):
recs = self.release_key(ts.key, reason=stimulus_id)
smsgs = [{"op": "release-worker-data", "key": ts.key}]
recs, smsgs = self.transition_generic_released(ts, stimulus_id=stimulus_id)
smsgs.append({"op": "release-worker-data", "key": ts.key})
return recs, smsgs

def transition_waiting_constrained(self, ts, *, stimulus_id):
Expand Down Expand Up @@ -2195,9 +2208,9 @@ def transition_executing_error(

def transition_rescheduled_next(self, ts, *, stimulus_id):
next_state = ts._next
recs = self.release_key(ts.key, reason=stimulus_id)
recs, smsgs = self.transition_generic_released(ts, stimulus_id=stimulus_id)
recs[ts] = next_state
return recs, []
return recs, smsgs

def transition_cancelled_fetch(self, ts, *, stimulus_id):
if ts.done:
Expand Down Expand Up @@ -2240,9 +2253,11 @@ def transition_cancelled_released(self, ts, *, stimulus_id):

for resource, quantity in ts.resource_restrictions.items():
self.available_resources[resource] += quantity
recommendations = self.release_key(ts.key, reason=stimulus_id)
recommendations[ts] = next_state or "released"
return recommendations, []
recommendations, smsgs = self.transition_generic_released(
ts, stimulus_id=stimulus_id
)
recommendations[ts] = next_state or recommendations.get(ts, "released")
return recommendations, smsgs

def transition_executing_released(self, ts, *, stimulus_id):
ts._previous = ts.state
Expand Down Expand Up @@ -2288,10 +2303,6 @@ def transition_executing_memory(self, ts, value=no_value, *, stimulus_id):
self.executed_count += 1
return self.transition_generic_memory(ts, value=value, stimulus_id=stimulus_id)

def transition_constrained_released(self, ts, *, stimulus_id):
recs = self.release_key(ts.key, reason=stimulus_id)
return recs, []

def transition_constrained_executing(self, ts, *, stimulus_id):
if self.validate:
assert not ts.waiting_for_data
Expand Down Expand Up @@ -3073,16 +3084,15 @@ def handle_steal_request(self, key, stimulus_id):
# If task is marked as "constrained" we haven't yet assigned it an
# `available_resources` to run on, that happens in
# `transition_constrained_executing`
self.transition(ts, "forgotten", stimulus_id=stimulus_id)
self.transition(ts, "released", stimulus_id=stimulus_id)

def release_key(
self,
key: str,
cause: TaskState | None = None,
reason: str | None = None,
report: bool = True,
):
recommendations = {}
) -> None:
try:
if self.validate:
assert not isinstance(key, TaskState)
Expand Down Expand Up @@ -3122,9 +3132,6 @@ def release_key(
ts.waiting_for_data.discard(d)
d.waiters.discard(ts)

if not d.waiters and d.state in {"flight", "fetch", "missing"}:
recommendations[d] = "forgotten"

ts.waiting_for_data.clear()
ts.nbytes = None
ts._previous = None
Expand All @@ -3148,8 +3155,6 @@ def release_key(
pdb.set_trace()
raise

return recommendations

################
# Execute Task #
################
Expand Down

0 comments on commit 1721d62

Please sign in to comment.