Skip to content

Commit

Permalink
Merge branch 'main' into AMM/avoid_paused
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Oct 18, 2021
2 parents fdb5e4c + 7d2516a commit 5698018
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 25 deletions.
23 changes: 13 additions & 10 deletions distributed/scheduler.py
Expand Up @@ -2751,8 +2751,7 @@ def transition_processing_memory(

if ws != ts._processing_on: # someone else has this task
logger.info(
"Unexpected worker completed task, likely due to "
"work stealing. Expected: %s, Got: %s, Key: %s",
"Unexpected worker completed task. Expected: %s, Got: %s, Key: %s",
ts._processing_on,
ws,
key,
Expand Down Expand Up @@ -2849,7 +2848,7 @@ def transition_memory_released(self, key, safe: bint = False):
worker_msg = {
"op": "free-keys",
"keys": [key],
"reason": f"Memory->Released {key}",
"stimulus_id": f"memory-released-{time()}",
}
for ws in ts._who_has:
worker_msgs[ws._address] = [worker_msg]
Expand Down Expand Up @@ -2949,7 +2948,11 @@ def transition_erred_released(self, key):
if dts._state == "erred":
recommendations[dts._key] = "waiting"

w_msg = {"op": "free-keys", "keys": [key], "reason": "Erred->Released"}
w_msg = {
"op": "free-keys",
"keys": [key],
"stimulus_id": f"erred-released-{time()}",
}
for ws_addr in ts._erred_on:
worker_msgs[ws_addr] = [w_msg]
ts._erred_on.clear()
Expand Down Expand Up @@ -3027,7 +3030,7 @@ def transition_processing_released(self, key):
{
"op": "free-keys",
"keys": [key],
"reason": f"processing-released-{time()}",
"stimulus_id": f"processing-released-{time()}",
}
]

Expand Down Expand Up @@ -4364,9 +4367,9 @@ async def add_worker(
worker_msgs[address] = []
worker_msgs[address].append(
{
"op": "free-keys",
"op": "remove-replicas",
"keys": already_released_keys,
"reason": f"reconnect-already-released-{time()}",
"stimulus_id": f"reconnect-already-released-{time()}",
}
)

Expand Down Expand Up @@ -4793,7 +4796,7 @@ def stimulus_task_finished(self, key=None, worker=None, **kwargs):
{
"op": "free-keys",
"keys": [key],
"reason": f"already-released-or-forgotten-{time()}",
"stimulus_id": f"already-released-or-forgotten-{time()}",
}
]
elif ts._state == "memory":
Expand Down Expand Up @@ -6018,7 +6021,7 @@ async def delete_worker_data(
await retry_operation(
self.rpc(addr=worker_address).free_keys,
keys=list(keys),
reason="rebalance/replicate",
stimulus_id=f"delete-data-{time()}",
)
except OSError as e:
# This can happen e.g. if the worker is going through controlled shutdown;
Expand Down Expand Up @@ -7899,7 +7902,7 @@ def _propagate_forgotten(
{
"op": "free-keys",
"keys": [key],
"reason": f"propagate-forgotten {ts.key}",
"stimulus_id": f"propagate-forgotten-{time()}",
}
]
state.remove_all_replicas(ts)
Expand Down
7 changes: 2 additions & 5 deletions distributed/spill.py
Expand Up @@ -4,14 +4,11 @@
from functools import partial
from typing import Any

from zict import Buffer, File, Func

from .protocol import deserialize_bytes, serialize_bytelist
from .sizeof import safe_sizeof

try:
from zict import Buffer, File, Func
except ImportError:
raise ImportError("Please `python -m pip install zict` for spill-to-disk workers")


class SpillBuffer(Buffer):
"""MutableMapping that automatically spills out dask key/value pairs to disk when
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_failed_workers.py
Expand Up @@ -488,7 +488,7 @@ def sink(*args):
# artificially, without notifying the scheduler.
# This can only succeed if B handles the missing data properly by
# removing A from the known sources of keys
a.handle_free_keys(keys=["f1"], reason="Am I evil?") # Yes, I am!
a.handle_free_keys(keys=["f1"], stimulus_id="Am I evil?") # Yes, I am!
result_fut = c.submit(sink, futures, workers=x.address)

await result_fut
Expand Down
2 changes: 0 additions & 2 deletions distributed/tests/test_scheduler.py
Expand Up @@ -2476,8 +2476,6 @@ async def assert_memory(scheduler_or_workerstate, attr: str, min_, max_, timeout
client=True, Worker=Nanny, worker_kwargs={"memory_limit": "500 MiB"}, timeout=120
)
async def test_memory(c, s, *_):
pytest.importorskip("zict")

# WorkerState objects, as opposed to the Nanny objects passed by gen_cluster
a, b = s.workers.values()

Expand Down
2 changes: 0 additions & 2 deletions distributed/tests/test_spill.py
@@ -1,7 +1,5 @@
import pytest

pytest.importorskip("zict")

from dask.sizeof import sizeof

from distributed.spill import SpillBuffer
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_worker.py
Expand Up @@ -2444,7 +2444,6 @@ async def test_hold_on_to_replicas(c, s, *workers):
await asyncio.sleep(0.01)


@pytest.mark.flaky(reruns=10, reruns_delay=5)
@gen_cluster(client=True)
async def test_worker_reconnects_mid_compute(c, s, a, b):
"""Ensure that, if a worker disconnects while computing a result, the scheduler will
Expand Down Expand Up @@ -2513,7 +2512,6 @@ def fast_on_a(lock):
await asyncio.sleep(0.001)


@pytest.mark.flaky(reruns=10, reruns_delay=5)
@gen_cluster(client=True)
async def test_worker_reconnects_mid_compute_multiple_states_on_scheduler(c, s, a, b):
"""
Expand Down Expand Up @@ -3089,6 +3087,7 @@ async def _wait_for_flight(key, worker):
await asyncio.sleep(0)


@pytest.mark.xfail(reason="#5406")
@gen_cluster(client=True)
async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a, b):
"""At time of writing, the gather_dep implementation filtered tasks again
Expand Down Expand Up @@ -3122,6 +3121,7 @@ async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a,
await fut3


@pytest.mark.xfail(reason="#5406")
@gen_cluster(
client=True,
config={
Expand Down
6 changes: 3 additions & 3 deletions distributed/worker.py
Expand Up @@ -1600,7 +1600,7 @@ def update_data(
self.batched_stream.send(msg)
return {"nbytes": {k: sizeof(v) for k, v in data.items()}, "status": "OK"}

def handle_free_keys(self, comm=None, keys=None, reason=None):
def handle_free_keys(self, comm=None, keys=None, stimulus_id=None):
"""
Handler to be called by the scheduler.
Expand All @@ -1611,14 +1611,14 @@ def handle_free_keys(self, comm=None, keys=None, reason=None):
still decide to hold on to the data and task since it is required by an
upstream dependency.
"""
self.log.append(("free-keys", keys, reason))
self.log.append(("free-keys", keys, stimulus_id))
recommendations = {}
for key in keys:
ts = self.tasks.get(key)
if ts:
recommendations[ts] = "released" if ts.dependents else "forgotten"

self.transitions(recommendations, stimulus_id=reason)
self.transitions(recommendations, stimulus_id=stimulus_id)

def handle_remove_replicas(self, keys, stimulus_id):
"""Stream handler notifying the worker that it might be holding unreferenced,
Expand Down

0 comments on commit 5698018

Please sign in to comment.