From c9613fec9f6b912c35d8bce5e598af84da1f7045 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Tue, 12 Apr 2022 16:08:52 -0500 Subject: [PATCH] Allow stealing of fast tasks in some situations Previously we avoided the stealing of fast tasks in some situations. It was generally considered a bad idea to spend non-trivial time in order to move around a 1ms task. However, sometimes it's not this task that matters, but the tasks that it unblocks. Being strict about not stealing may not always be ideal. This commit relaxes stealing of fast tasks in three ways: 1. It sets a minimum duration of tasks to 5ms, this being something like the overhead in a real system 2. It changes the network latency variable in stealing.py from 100ms to 10ms 3. It no longer completely rules out very fast tasks from stealing, but instead lets them compete based on their compute/transfer ratio (which should ordinarily be terrible) In cases where transfer times are trivial this becomes doable again. Tests don't pass yet, this is up for comments --- distributed/scheduler.py | 2 +- distributed/stealing.py | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ddf1e1ad2b..ebcce8a802 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3441,7 +3441,7 @@ def get_task_duration(self, ts: TaskState) -> double: """ duration: double = ts._prefix._duration_average if duration >= 0: - return duration + return max(0.005, duration) s: set = self._unknown_durations.get(ts._prefix._name) # type: ignore if s is None: diff --git a/distributed/stealing.py b/distributed/stealing.py index 54ef0098c6..e056d905d6 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -23,7 +23,7 @@ # submission which may include code serialization. Therefore, be very # conservative in the latency estimation to suppress too aggressive stealing # of small tasks -LATENCY = 0.1 +LATENCY = 0.01 logger = logging.getLogger(__name__) @@ -199,8 +199,7 @@ def steal_time_ratio(self, ts): ws = ts.processing_on compute_time = ws.processing[ts] - if compute_time < 0.005: # 5ms, just give up - return None, None + compute_time = max(compute_time, 0.010) nbytes = ts.get_nbytes_deps() transfer_time = nbytes / self.scheduler.bandwidth + LATENCY