From 1ca495c3d7a34181a2c8f9e743cbef78dc63c490 Mon Sep 17 00:00:00 2001 From: Benoit Person Date: Tue, 22 Jun 2021 08:31:04 +0000 Subject: [PATCH] Fix tasks in an infinite slots pool were never scheduled (#15247) Infinite pools: Make their `total_slots` be `inf` instead of `-1` (cherry picked from commit 96f764389eded9f1ea908e899b54bf00635ec787) --- airflow/models/pool.py | 6 +++++- tests/jobs/test_scheduler_job.py | 35 ++++++++++++++++++++++++++++++++ tests/models/test_pool.py | 4 ++-- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/airflow/models/pool.py b/airflow/models/pool.py index feade7787e075..3d152eee80fe3 100644 --- a/airflow/models/pool.py +++ b/airflow/models/pool.py @@ -106,6 +106,8 @@ def slots_stats( pool_rows: Iterable[Tuple[str, int]] = query.all() for (pool_name, total_slots) in pool_rows: + if total_slots == -1: + total_slots = float('inf') # type: ignore pools[pool_name] = PoolStats(total=total_slots, running=0, queued=0, open=0) state_count_by_pool = ( @@ -115,8 +117,10 @@ def slots_stats( ).all() # calculate queued and running metrics - count: int for (pool_name, state, count) in state_count_by_pool: + # Some databases return decimal.Decimal here. + count = int(count) + stats_dict: Optional[PoolStats] = pools.get(pool_name) if not stats_dict: continue diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 954b395f2bf4b..faf19d9b096ea 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1264,6 +1264,41 @@ def test_nonexistent_pool(self): assert 0 == len(res) session.rollback() + def test_infinite_pool(self): + dag_id = 'SchedulerJobTest.test_infinite_pool' + task_id = 'dummy' + dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) + task = DummyOperator(dag=dag, task_id=task_id, pool="infinite_pool") + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + session = settings.Session() + + dag_model = DagModel( + dag_id=dag_id, + is_paused=False, + concurrency=dag.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model) + dr = dag.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + state=State.RUNNING, + ) + + ti = TaskInstance(task, dr.execution_date) + ti.state = State.SCHEDULED + session.merge(ti) + infinite_pool = Pool(pool='infinite_pool', slots=-1, description='infinite pool') + session.add(infinite_pool) + session.commit() + + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session) + session.flush() + assert 1 == len(res) + session.rollback() + def test_find_executable_task_instances_none(self): dag_id = 'SchedulerJobTest.test_find_executable_task_instances_none' task_id_1 = 'dummy' diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py index f4c7626615bda..7981e230f2af7 100644 --- a/tests/models/test_pool.py +++ b/tests/models/test_pool.py @@ -110,10 +110,10 @@ def test_infinite_slots(self): "running": 0, }, "test_pool": { - "open": -1, + "open": float('inf'), "queued": 1, "running": 1, - "total": -1, + "total": float('inf'), }, } == pool.slots_stats()