From ce1d0f1db7323ce5e3f513205fdf2fee24a57e67 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 15 Oct 2021 09:06:35 -0500 Subject: [PATCH 1/3] Remove zict-related skips (#5429) --- distributed/spill.py | 7 ++----- distributed/tests/test_scheduler.py | 2 -- distributed/tests/test_spill.py | 2 -- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/distributed/spill.py b/distributed/spill.py index 5d90361d170..2c849c2447e 100644 --- a/distributed/spill.py +++ b/distributed/spill.py @@ -4,14 +4,11 @@ from functools import partial from typing import Any +from zict import Buffer, File, Func + from .protocol import deserialize_bytes, serialize_bytelist from .sizeof import safe_sizeof -try: - from zict import Buffer, File, Func -except ImportError: - raise ImportError("Please `python -m pip install zict` for spill-to-disk workers") - class SpillBuffer(Buffer): """MutableMapping that automatically spills out dask key/value pairs to disk when diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 7d26e3e7904..b222e09feed 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2473,8 +2473,6 @@ async def assert_memory(scheduler_or_workerstate, attr: str, min_, max_, timeout client=True, Worker=Nanny, worker_kwargs={"memory_limit": "500 MiB"}, timeout=120 ) async def test_memory(c, s, *_): - pytest.importorskip("zict") - # WorkerState objects, as opposed to the Nanny objects passed by gen_cluster a, b = s.workers.values() diff --git a/distributed/tests/test_spill.py b/distributed/tests/test_spill.py index 8735f1acbdf..d013b141158 100644 --- a/distributed/tests/test_spill.py +++ b/distributed/tests/test_spill.py @@ -1,7 +1,5 @@ import pytest -pytest.importorskip("zict") - from dask.sizeof import sizeof from distributed.spill import SpillBuffer From 6a0217e4d7ac442f9c9be018baab37ae3ec90cb9 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 15 Oct 2021 18:22:57 +0100 Subject: [PATCH 2/3] Mark test_gather_dep* as xfail (#5432) * Mark test_gather_dep as flaky * xfail --- distributed/tests/test_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index a92700b053c..93d391f6ef2 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3089,6 +3089,7 @@ async def _wait_for_flight(key, worker): await asyncio.sleep(0) +@pytest.mark.xfail(reason="#5406") @gen_cluster(client=True) async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a, b): """At time of writing, the gather_dep implementation filtered tasks again @@ -3122,6 +3123,7 @@ async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a, await fut3 +@pytest.mark.xfail(reason="#5406") @gen_cluster( client=True, config={ From 7d2516a6d752594e8e24cf530fcd2dcefacf7c17 Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Mon, 18 Oct 2021 19:04:04 +0200 Subject: [PATCH 3/3] Ensure reconnecting workers do not loose required data (#5436) --- distributed/scheduler.py | 23 +++++++++++++---------- distributed/tests/test_failed_workers.py | 2 +- distributed/tests/test_worker.py | 2 -- distributed/worker.py | 6 +++--- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 481220a97e5..fd873469af9 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2737,8 +2737,7 @@ def transition_processing_memory( if ws != ts._processing_on: # someone else has this task logger.info( - "Unexpected worker completed task, likely due to " - "work stealing. Expected: %s, Got: %s, Key: %s", + "Unexpected worker completed task. Expected: %s, Got: %s, Key: %s", ts._processing_on, ws, key, @@ -2835,7 +2834,7 @@ def transition_memory_released(self, key, safe: bint = False): worker_msg = { "op": "free-keys", "keys": [key], - "reason": f"Memory->Released {key}", + "stimulus_id": f"memory-released-{time()}", } for ws in ts._who_has: worker_msgs[ws._address] = [worker_msg] @@ -2935,7 +2934,11 @@ def transition_erred_released(self, key): if dts._state == "erred": recommendations[dts._key] = "waiting" - w_msg = {"op": "free-keys", "keys": [key], "reason": "Erred->Released"} + w_msg = { + "op": "free-keys", + "keys": [key], + "stimulus_id": f"erred-released-{time()}", + } for ws_addr in ts._erred_on: worker_msgs[ws_addr] = [w_msg] ts._erred_on.clear() @@ -3013,7 +3016,7 @@ def transition_processing_released(self, key): { "op": "free-keys", "keys": [key], - "reason": f"processing-released-{time()}", + "stimulus_id": f"processing-released-{time()}", } ] @@ -4339,9 +4342,9 @@ async def add_worker( worker_msgs[address] = [] worker_msgs[address].append( { - "op": "free-keys", + "op": "remove-replicas", "keys": already_released_keys, - "reason": f"reconnect-already-released-{time()}", + "stimulus_id": f"reconnect-already-released-{time()}", } ) for ts in list(parent._unrunnable): @@ -4767,7 +4770,7 @@ def stimulus_task_finished(self, key=None, worker=None, **kwargs): { "op": "free-keys", "keys": [key], - "reason": f"already-released-or-forgotten-{time()}", + "stimulus_id": f"already-released-or-forgotten-{time()}", } ] elif ts._state == "memory": @@ -5965,7 +5968,7 @@ async def delete_worker_data( await retry_operation( self.rpc(addr=worker_address).free_keys, keys=list(keys), - reason="rebalance/replicate", + stimulus_id=f"delete-data-{time()}", ) except OSError as e: # This can happen e.g. if the worker is going through controlled shutdown; @@ -7846,7 +7849,7 @@ def _propagate_forgotten( { "op": "free-keys", "keys": [key], - "reason": f"propagate-forgotten {ts.key}", + "stimulus_id": f"propagate-forgotten-{time()}", } ] state.remove_all_replicas(ts) diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index 8e5d01167d0..2b6d5149a2a 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -488,7 +488,7 @@ def sink(*args): # artificially, without notifying the scheduler. # This can only succeed if B handles the missing data properly by # removing A from the known sources of keys - a.handle_free_keys(keys=["f1"], reason="Am I evil?") # Yes, I am! + a.handle_free_keys(keys=["f1"], stimulus_id="Am I evil?") # Yes, I am! result_fut = c.submit(sink, futures, workers=x.address) await result_fut diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 93d391f6ef2..81e25d8d039 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -2444,7 +2444,6 @@ async def test_hold_on_to_replicas(c, s, *workers): await asyncio.sleep(0.01) -@pytest.mark.flaky(reruns=10, reruns_delay=5) @gen_cluster(client=True) async def test_worker_reconnects_mid_compute(c, s, a, b): """Ensure that, if a worker disconnects while computing a result, the scheduler will @@ -2513,7 +2512,6 @@ def fast_on_a(lock): await asyncio.sleep(0.001) -@pytest.mark.flaky(reruns=10, reruns_delay=5) @gen_cluster(client=True) async def test_worker_reconnects_mid_compute_multiple_states_on_scheduler(c, s, a, b): """ diff --git a/distributed/worker.py b/distributed/worker.py index 88e82d5915a..ebd62e9cd9a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1596,7 +1596,7 @@ def update_data( self.batched_stream.send(msg) return {"nbytes": {k: sizeof(v) for k, v in data.items()}, "status": "OK"} - def handle_free_keys(self, comm=None, keys=None, reason=None): + def handle_free_keys(self, comm=None, keys=None, stimulus_id=None): """ Handler to be called by the scheduler. @@ -1607,14 +1607,14 @@ def handle_free_keys(self, comm=None, keys=None, reason=None): still decide to hold on to the data and task since it is required by an upstream dependency. """ - self.log.append(("free-keys", keys, reason)) + self.log.append(("free-keys", keys, stimulus_id)) recommendations = {} for key in keys: ts = self.tasks.get(key) if ts: recommendations[ts] = "released" if ts.dependents else "forgotten" - self.transitions(recommendations, stimulus_id=reason) + self.transitions(recommendations, stimulus_id=stimulus_id) def handle_remove_replicas(self, keys, stimulus_id): """Stream handler notifying the worker that it might be holding unreferenced,