Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure reconnecting workers do not loose required data #5436

Merged
merged 3 commits into from Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 13 additions & 10 deletions distributed/scheduler.py
Expand Up @@ -2737,8 +2737,7 @@ def transition_processing_memory(

if ws != ts._processing_on: # someone else has this task
logger.info(
"Unexpected worker completed task, likely due to "
"work stealing. Expected: %s, Got: %s, Key: %s",
"Unexpected worker completed task. Expected: %s, Got: %s, Key: %s",
ts._processing_on,
ws,
key,
Expand Down Expand Up @@ -2835,7 +2834,7 @@ def transition_memory_released(self, key, safe: bint = False):
worker_msg = {
"op": "free-keys",
"keys": [key],
"reason": f"Memory->Released {key}",
"stimulus_id": f"memory-released-{time()}",
}
for ws in ts._who_has:
worker_msgs[ws._address] = [worker_msg]
Expand Down Expand Up @@ -2935,7 +2934,11 @@ def transition_erred_released(self, key):
if dts._state == "erred":
recommendations[dts._key] = "waiting"

w_msg = {"op": "free-keys", "keys": [key], "reason": "Erred->Released"}
w_msg = {
"op": "free-keys",
"keys": [key],
"stimulus_id": f"erred-released-{time()}",
}
for ws_addr in ts._erred_on:
worker_msgs[ws_addr] = [w_msg]
ts._erred_on.clear()
Expand Down Expand Up @@ -3013,7 +3016,7 @@ def transition_processing_released(self, key):
{
"op": "free-keys",
"keys": [key],
"reason": f"processing-released-{time()}",
"stimulus_id": f"processing-released-{time()}",
}
]

Expand Down Expand Up @@ -4339,9 +4342,9 @@ async def add_worker(
worker_msgs[address] = []
worker_msgs[address].append(
{
"op": "free-keys",
"op": "remove-replicas",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the relevant change

"keys": already_released_keys,
"reason": f"reconnect-already-released-{time()}",
"stimulus_id": f"reconnect-already-released-{time()}",
}
)
for ts in list(parent._unrunnable):
Expand Down Expand Up @@ -4767,7 +4770,7 @@ def stimulus_task_finished(self, key=None, worker=None, **kwargs):
{
"op": "free-keys",
"keys": [key],
"reason": f"already-released-or-forgotten-{time()}",
"stimulus_id": f"already-released-or-forgotten-{time()}",
}
]
elif ts._state == "memory":
Expand Down Expand Up @@ -5965,7 +5968,7 @@ async def delete_worker_data(
await retry_operation(
self.rpc(addr=worker_address).free_keys,
keys=list(keys),
reason="rebalance/replicate",
stimulus_id=f"delete-data-{time()}",
)
except OSError as e:
# This can happen e.g. if the worker is going through controlled shutdown;
Expand Down Expand Up @@ -7846,7 +7849,7 @@ def _propagate_forgotten(
{
"op": "free-keys",
"keys": [key],
"reason": f"propagate-forgotten {ts.key}",
"stimulus_id": f"propagate-forgotten-{time()}",
}
]
state.remove_all_replicas(ts)
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_failed_workers.py
Expand Up @@ -488,7 +488,7 @@ def sink(*args):
# artificially, without notifying the scheduler.
# This can only succeed if B handles the missing data properly by
# removing A from the known sources of keys
a.handle_free_keys(keys=["f1"], reason="Am I evil?") # Yes, I am!
a.handle_free_keys(keys=["f1"], stimulus_id="Am I evil?") # Yes, I am!
result_fut = c.submit(sink, futures, workers=x.address)

await result_fut
Expand Down
2 changes: 0 additions & 2 deletions distributed/tests/test_worker.py
Expand Up @@ -2444,7 +2444,6 @@ async def test_hold_on_to_replicas(c, s, *workers):
await asyncio.sleep(0.01)


@pytest.mark.flaky(reruns=10, reruns_delay=5)
@gen_cluster(client=True)
async def test_worker_reconnects_mid_compute(c, s, a, b):
"""Ensure that, if a worker disconnects while computing a result, the scheduler will
Expand Down Expand Up @@ -2513,7 +2512,6 @@ def fast_on_a(lock):
await asyncio.sleep(0.001)


@pytest.mark.flaky(reruns=10, reruns_delay=5)
@gen_cluster(client=True)
async def test_worker_reconnects_mid_compute_multiple_states_on_scheduler(c, s, a, b):
"""
Expand Down
6 changes: 3 additions & 3 deletions distributed/worker.py
Expand Up @@ -1596,7 +1596,7 @@ def update_data(
self.batched_stream.send(msg)
return {"nbytes": {k: sizeof(v) for k, v in data.items()}, "status": "OK"}

def handle_free_keys(self, comm=None, keys=None, reason=None):
def handle_free_keys(self, comm=None, keys=None, stimulus_id=None):
"""
Handler to be called by the scheduler.
Expand All @@ -1607,14 +1607,14 @@ def handle_free_keys(self, comm=None, keys=None, reason=None):
still decide to hold on to the data and task since it is required by an
upstream dependency.
"""
self.log.append(("free-keys", keys, reason))
self.log.append(("free-keys", keys, stimulus_id))
recommendations = {}
for key in keys:
ts = self.tasks.get(key)
if ts:
recommendations[ts] = "released" if ts.dependents else "forgotten"

self.transitions(recommendations, stimulus_id=reason)
self.transitions(recommendations, stimulus_id=stimulus_id)

def handle_remove_replicas(self, keys, stimulus_id):
"""Stream handler notifying the worker that it might be holding unreferenced,
Expand Down