Skip to content

Commit

Permalink
Worker State Machine breaks on fetch->missing + _ensure_communicating
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jun 8, 2022
1 parent de0adb0 commit 7b7024f
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 2 deletions.
82 changes: 82 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ExecuteSuccessEvent,
Instruction,
RecommendationsConflict,
RefreshWhoHasEvent,
ReleaseWorkerDataMsg,
RescheduleEvent,
RescheduleMsg,
Expand Down Expand Up @@ -603,3 +604,84 @@ async def test_missing_to_waiting(c, s, w1, w2, w3):
await w1.close()

await f1


@gen_cluster(client=True, nthreads=[("", 1)] * 3)
async def test_fetch_to_missing_on_refresh_who_has(c, s, w1, w2, w3):
"""
1. Two tasks, x and y, are only available on a busy worker.
The worker sends request-refresh-who-has to the scheduler.
2. The scheduler responds that x has become missing, while y has gained an
additional replica
3. The handler for RefreshWhoHasEvent empties x.who_has and recommends a transition
to missing.
5. Before the recommendation can be implemented, the same event invokes
_ensure_communicating to allow y to transition to flight. This in turn pops x
data_needed - but x has an empty who_has, which is an exceptional situation.
6. The transition fetch->missing is executed, but x is no longer in
data_needed - another exceptional situation.
"""
x = c.submit(inc, 1, key="x", workers=[w1.address])
y = c.submit(inc, 2, key="y", workers=[w1.address])
await wait([x, y])
w1.total_in_connections = 0
s.request_acquire_replicas(w3.address, ["x", "y"], stimulus_id="test1")

# The tasks will now flip-flop between fetch and flight every 150ms
# (see Worker.retry_busy_worker_later)
await wait_for_state("x", "fetch", w3)
await wait_for_state("y", "fetch", w3)
assert w1.address in w3.busy_workers
# w3 sent {op: request-refresh-who-has, keys: [x, y]}
# There also may have been enough time for a refresh-who-has message to come back,
# which reiterated what the w3 already knew:
# {op: refresh-who-has, who_has={x: [w1.address], y: [w1.address]}}

# Let's instead simulate that, while request-refresh-who-has was in transit,
# w2 gained a replica of y and then subsequently w1 closed down.
# When request-refresh-who-has lands, the scheduler will respond:
# {op: refresh-who-has, who_has={x: [], y: [w2.address]}}
w3.handle_stimulus(
RefreshWhoHasEvent(who_has={"x": {}, "y": {w2.address}}, stimulus_id="test3")
)
assert w3.tasks["x"].state == "missing"
assert w3.tasks["y"].state == "flight"
assert w3.tasks["y"].who_has == {w2.address}


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_fetch_to_missing_on_network_failure(c, s, a):
"""
1. Multiple tasks can be fetched from the same worker
2. Only some transition to flight, while the others remain in fetch state, e.g.
because of excessive size
3. gather_dep returns GatherDepNetworkFailureEvent
4. The event empties has_what. This impacts both the tasks in fetch as well as those
in flight. The event recommends a transition to missing for all tasks with empty
who_has.
5. Before the recommendation can be implemented, the same event invokes
_ensure_communicating, which pops the tasks in fetch from data_needed - but they
have an empty who_has, which is an exceptional situation.
7. The transition fetch->missing is executed, but the tasks are no longer in
data_needed - another exceptional situation.
"""
block_get_data = asyncio.Event()

class BlockedBreakingWorker(Worker):
async def get_data(self, comm, *args, **kwargs):
await block_get_data.wait()
raise OSError("fake error")

async with BlockedBreakingWorker(s.address) as b:
x = c.submit(inc, 1, key="x", workers=[b.address])
y = c.submit(inc, 2, key="y", workers=[b.address])
await wait([x, y])
s.request_acquire_replicas(a.address, ["x"], stimulus_id="test_x")
await wait_for_state("x", "flight", a)
s.request_acquire_replicas(a.address, ["y"], stimulus_id="test_y")
await wait_for_state("y", "fetch", a)

block_get_data.set()

await wait_for_state("x", "missing", a)
await wait_for_state("y", "missing", a)
18 changes: 16 additions & 2 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2168,7 +2168,14 @@ def transition_fetch_flight(
def transition_fetch_missing(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
self.data_needed.remove(ts)
# There's a use case where ts won't be found in self.data_needed, so
# `self.data_needed.remove(ts)` would crash:
# 1. An event handler empties who_has and pushes a recommendation to missing
# 2. The same event handler calls _ensure_communicating, which pops the task
# from data_needed
# 3. The recommendation is enacted
# See matching code in _ensure_communicating.
self.data_needed.discard(ts)
return self.transition_generic_missing(ts, stimulus_id=stimulus_id)

def transition_memory_released(
Expand Down Expand Up @@ -2990,9 +2997,16 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs:

if self.validate:
assert ts.state == "fetch"
assert ts.who_has
assert self.address not in ts.who_has

if not ts.who_has:
# An event handler just emptied who_has and recommended a fetch->missing
# transition. Then, the same handler called _ensure_communicating. The
# transition hasn't been enacted yet, so the task is still in fetch
# state and in data_needed.
# See matching code in transition_fetch_missing.
continue

workers = [
w
for w in ts.who_has
Expand Down

0 comments on commit 7b7024f

Please sign in to comment.