Skip to content

Commit

Permalink
Ignore deps of root-likes. This is working well.
Browse files Browse the repository at this point in the history
When a task is root-like and the previous worker is full, we don't want to use the normal `decide_worker` logic, since that only considers as candidates workers that have the deps of the dask. Since the task only has 1-5 deps, we'd only ever consider the same 1-5 workers.
  • Loading branch information
gjoseph92 committed Jun 16, 2021
1 parent 7b9728f commit 0fbb75e
Showing 1 changed file with 68 additions and 48 deletions.
116 changes: 68 additions & 48 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2399,7 +2399,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState:
ws = wp_vals[self._n_tasks % n_workers]

# TODO repeated logic from `decide_worker`
print(f"nodeps / no last worker fastpah - {ts.group_key}")
print(f"nodeps / no last worker fastpah - {ts.group_key} -> {ws.name}")
ts._group._last_worker = ws
ts._group._last_worker_tasks_left = math.floor(
len(ts._group) / self._total_nthreads
Expand Down Expand Up @@ -7532,53 +7532,28 @@ def decide_worker(
of bytes sent between workers. This is determined by calling the
*objective* function.
"""
ws: WorkerState = None
wws: WorkerState
dts: TaskState
deps: set = ts._dependencies
candidates: set
assert all([dts._who_has for dts in deps])
if ts._actor:
candidates = set(all_workers)
else:
candidates = {wws for dts in deps for wws in dts._who_has}
if valid_workers is None:
if not candidates:
candidates = set(all_workers)
else:
candidates &= valid_workers
if not candidates:
candidates = valid_workers
if not candidates:
if ts._loose_restrictions:
ws = decide_worker(ts, all_workers, None, objective, total_nthreads)
return ws

ncandidates: Py_ssize_t = len(candidates)
if ncandidates == 0:
pass
elif ncandidates == 1:
for ws in candidates:
break
else:
group: TaskGroup = ts._group
ws = group._last_worker
group: TaskGroup = ts._group
ws: WorkerState = group._last_worker

if valid_workers is not None:
total_nthreads = sum(wws._nthreads for wws in candidates)
if valid_workers is not None:
total_nthreads = sum(wws._nthreads for wws in valid_workers)

group_tasks_per_worker = len(group) / total_nthreads
group_tasks_per_worker = len(group) / total_nthreads
ignore_deps_while_picking: bool = False

# Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks
# don't require data transfer. Assumes `decide_worker` is being called in priority order.
if (
ws is not None # there is a previous worker
and group._last_worker_tasks_left > 0 # previous worker not fully assigned
and group_tasks_per_worker > 1 # group is larger than cluster
and ( # is a root-like task (task group depends on very few tasks)
sum(map(len, group._dependencies)) < 5 # TODO what number
)
):
# Try to schedule sibling root-like tasks on the same workers, so subsequent reduction tasks
# don't require data transfer. Assumes `decide_worker` is being called in priority order.
if (
ws is not None # there is a previous worker
and group_tasks_per_worker > 1 # group is larger than cluster
and ( # is a root-like task (task group is large, but depends on very few tasks)
sum(map(len, group._dependencies)) < 5 # TODO what number
)
):
if group._last_worker_tasks_left > 0:
# Previous worker not fully assigned
group._last_worker_tasks_left -= 1
if group._last_worker_priority >= ts.priority:
print(
Expand All @@ -7589,15 +7564,60 @@ def decide_worker(
f"{group_tasks_per_worker=}\n"
)
group._last_worker_priority = ts.priority
print(f"reusing worker - {ts.group_key}")
print(f"reusing worker - {ts.group_key} -> {ws.name}")
return ws

print(f"picking worker - {ts.group_key}")
# Previous worker is fully assigned, so pick a new worker.
# Since this is a root-like task, we should ignore the placement of its dependencies while selecting workers.
# Every worker is going to end up running this type of task eventually, and any dependencies will have to be
# transferred to all workers, so there's no gain from only considering workers where the dependencies already live.
# Indeed, we _must_ consider all workers, otherwise we would keep picking the same "new" worker(s) every time,
# since there are only N workers to choose from that actually have the dependency (where N <= n_deps).
ignore_deps_while_picking = True
print(f"{ts.group_key} is root-like but {ws.name} is full - picking a new worker")

# Not a root-like task; pick the best worker among the valid workers
# that hold at least one dependency of this task.
deps: set = ts._dependencies
dts: TaskState
candidates: set
assert all([dts._who_has for dts in deps])
if ignore_deps_while_picking:
candidates = valid_workers if valid_workers is not None else set(all_workers)
else:
if ts._actor:
candidates = set(all_workers)
else:
candidates = {wws for dts in deps for wws in dts._who_has}
if valid_workers is None:
if not candidates:
candidates = set(all_workers)
else:
candidates &= valid_workers
if not candidates:
candidates = valid_workers
if not candidates:
if ts._loose_restrictions:
ws = decide_worker(ts, all_workers, None, objective, total_nthreads)
return ws

ncandidates: Py_ssize_t = len(candidates)
if ncandidates == 0:
print(f"no candidates - {ts.group_key}")
pass
elif ncandidates == 1:
# NOTE: this is the ideal case: all the deps are already on the same worker.
# We did a good job in previous `decide_worker`s!
for ws in candidates:
break
print(f"1 candidate - {ts.group_key} -> {ws.name}")
else:
ws = min(candidates, key=objective)
group._last_worker = ws
group._last_worker_tasks_left = math.floor(group_tasks_per_worker)
group._last_worker_priority = ts.priority
print(f"picked worker - {ts.group_key} -> {ws.name}")

group._last_worker = ws
group._last_worker_tasks_left = math.floor(group_tasks_per_worker)
group._last_worker_priority = ts.priority
return ws


Expand Down

0 comments on commit 0fbb75e

Please sign in to comment.