Skip to content

Commit

Permalink
fixup! Use enum for worker state name
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Oct 20, 2021
1 parent d14728a commit c7e6e8b
Showing 1 changed file with 32 additions and 31 deletions.
63 changes: 32 additions & 31 deletions distributed/diagnostics/tests/test_worker_plugin.py
Expand Up @@ -4,6 +4,7 @@

from distributed import Worker, WorkerPlugin
from distributed.utils_test import async_wait_for, gen_cluster, inc
from distributed.worker import WTSName


class MyPlugin(WorkerPlugin):
Expand Down Expand Up @@ -106,12 +107,12 @@ async def test_create_on_construction(c, s, a, b):
@gen_cluster(nthreads=[("127.0.0.1", 1)], client=True)
async def test_normal_task_transitions_called(c, s, w):
expected_notifications = [
{"key": "task", "start": "released", "finish": "waiting"},
{"key": "task", "start": "waiting", "finish": "ready"},
{"key": "task", "start": "ready", "finish": "executing"},
{"key": "task", "start": "executing", "finish": "memory"},
{"key": "task", "start": "memory", "finish": "released"},
{"key": "task", "start": "released", "finish": "forgotten"},
{"key": "task", "start": WTSName.released, "finish": WTSName.waiting},
{"key": "task", "start": WTSName.waiting, "finish": WTSName.ready},
{"key": "task", "start": WTSName.ready, "finish": WTSName.executing},
{"key": "task", "start": WTSName.executing, "finish": WTSName.memory},
{"key": "task", "start": WTSName.memory, "finish": WTSName.released},
{"key": "task", "start": WTSName.released, "finish": WTSName.forgotten},
]

plugin = MyPlugin(1, expected_notifications=expected_notifications)
Expand All @@ -127,12 +128,12 @@ def failing(x):
raise Exception()

expected_notifications = [
{"key": "task", "start": "released", "finish": "waiting"},
{"key": "task", "start": "waiting", "finish": "ready"},
{"key": "task", "start": "ready", "finish": "executing"},
{"key": "task", "start": "executing", "finish": "error"},
{"key": "task", "start": "error", "finish": "released"},
{"key": "task", "start": "released", "finish": "forgotten"},
{"key": "task", "start": WTSName.released, "finish": WTSName.waiting},
{"key": "task", "start": WTSName.waiting, "finish": WTSName.ready},
{"key": "task", "start": WTSName.ready, "finish": WTSName.executing},
{"key": "task", "start": WTSName.executing, "finish": WTSName.error},
{"key": "task", "start": WTSName.error, "finish": WTSName.released},
{"key": "task", "start": WTSName.released, "finish": WTSName.forgotten},
]

plugin = MyPlugin(1, expected_notifications=expected_notifications)
Expand All @@ -148,12 +149,12 @@ def failing(x):
)
async def test_superseding_task_transitions_called(c, s, w):
expected_notifications = [
{"key": "task", "start": "released", "finish": "waiting"},
{"key": "task", "start": "waiting", "finish": "constrained"},
{"key": "task", "start": "constrained", "finish": "executing"},
{"key": "task", "start": "executing", "finish": "memory"},
{"key": "task", "start": "memory", "finish": "released"},
{"key": "task", "start": "released", "finish": "forgotten"},
{"key": "task", "start": WTSName.released, "finish": WTSName.waiting},
{"key": "task", "start": WTSName.waiting, "finish": WTSName.constrained},
{"key": "task", "start": WTSName.constrained, "finish": WTSName.executing},
{"key": "task", "start": WTSName.executing, "finish": WTSName.memory},
{"key": "task", "start": WTSName.memory, "finish": WTSName.released},
{"key": "task", "start": WTSName.released, "finish": WTSName.forgotten},
]

plugin = MyPlugin(1, expected_notifications=expected_notifications)
Expand All @@ -168,18 +169,18 @@ async def test_dependent_tasks(c, s, w):
dsk = {"dep": 1, "task": (inc, "dep")}

expected_notifications = [
{"key": "dep", "start": "released", "finish": "waiting"},
{"key": "dep", "start": "waiting", "finish": "ready"},
{"key": "dep", "start": "ready", "finish": "executing"},
{"key": "dep", "start": "executing", "finish": "memory"},
{"key": "task", "start": "released", "finish": "waiting"},
{"key": "task", "start": "waiting", "finish": "ready"},
{"key": "task", "start": "ready", "finish": "executing"},
{"key": "task", "start": "executing", "finish": "memory"},
{"key": "dep", "start": "memory", "finish": "released"},
{"key": "task", "start": "memory", "finish": "released"},
{"key": "task", "start": "released", "finish": "forgotten"},
{"key": "dep", "start": "released", "finish": "forgotten"},
{"key": "dep", "start": WTSName.released, "finish": WTSName.waiting},
{"key": "dep", "start": WTSName.waiting, "finish": WTSName.ready},
{"key": "dep", "start": WTSName.ready, "finish": WTSName.executing},
{"key": "dep", "start": WTSName.executing, "finish": WTSName.memory},
{"key": "task", "start": WTSName.released, "finish": WTSName.waiting},
{"key": "task", "start": WTSName.waiting, "finish": WTSName.ready},
{"key": "task", "start": WTSName.ready, "finish": WTSName.executing},
{"key": "task", "start": WTSName.executing, "finish": WTSName.memory},
{"key": "dep", "start": WTSName.memory, "finish": WTSName.released},
{"key": "task", "start": WTSName.memory, "finish": WTSName.released},
{"key": "task", "start": WTSName.released, "finish": WTSName.forgotten},
{"key": "dep", "start": WTSName.released, "finish": WTSName.forgotten},
]

plugin = MyPlugin(1, expected_notifications=expected_notifications)
Expand Down Expand Up @@ -215,7 +216,7 @@ def __init__(self):
def release_key(self, key, state, cause, reason, report):
# Ensure that the handler still works
self._called = True
assert state == "memory"
assert state == WTSName.memory
assert key == "task"

def teardown(self, worker):
Expand Down

0 comments on commit c7e6e8b

Please sign in to comment.