Skip to content

Commit

Permalink
Fix tasks in an infinite slots pool were never scheduled (#15247)
Browse files Browse the repository at this point in the history
Infinite pools: Make their `total_slots` be `inf` instead of `-1`

(cherry picked from commit 96f7643)
  • Loading branch information
bperson authored and ashb committed Jun 22, 2021
1 parent c6313e4 commit 1ca495c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 3 deletions.
6 changes: 5 additions & 1 deletion airflow/models/pool.py
Expand Up @@ -106,6 +106,8 @@ def slots_stats(

pool_rows: Iterable[Tuple[str, int]] = query.all()
for (pool_name, total_slots) in pool_rows:
if total_slots == -1:
total_slots = float('inf') # type: ignore
pools[pool_name] = PoolStats(total=total_slots, running=0, queued=0, open=0)

state_count_by_pool = (
Expand All @@ -115,8 +117,10 @@ def slots_stats(
).all()

# calculate queued and running metrics
count: int
for (pool_name, state, count) in state_count_by_pool:
# Some databases return decimal.Decimal here.
count = int(count)

stats_dict: Optional[PoolStats] = pools.get(pool_name)
if not stats_dict:
continue
Expand Down
35 changes: 35 additions & 0 deletions tests/jobs/test_scheduler_job.py
Expand Up @@ -1264,6 +1264,41 @@ def test_nonexistent_pool(self):
assert 0 == len(res)
session.rollback()

def test_infinite_pool(self):
dag_id = 'SchedulerJobTest.test_infinite_pool'
task_id = 'dummy'
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
task = DummyOperator(dag=dag, task_id=task_id, pool="infinite_pool")
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))

self.scheduler_job = SchedulerJob(subdir=os.devnull)
session = settings.Session()

dag_model = DagModel(
dag_id=dag_id,
is_paused=False,
concurrency=dag.concurrency,
has_task_concurrency_limits=False,
)
session.add(dag_model)
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)

ti = TaskInstance(task, dr.execution_date)
ti.state = State.SCHEDULED
session.merge(ti)
infinite_pool = Pool(pool='infinite_pool', slots=-1, description='infinite pool')
session.add(infinite_pool)
session.commit()

res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
session.flush()
assert 1 == len(res)
session.rollback()

def test_find_executable_task_instances_none(self):
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_none'
task_id_1 = 'dummy'
Expand Down
4 changes: 2 additions & 2 deletions tests/models/test_pool.py
Expand Up @@ -110,10 +110,10 @@ def test_infinite_slots(self):
"running": 0,
},
"test_pool": {
"open": -1,
"open": float('inf'),
"queued": 1,
"running": 1,
"total": -1,
"total": float('inf'),
},
} == pool.slots_stats()

Expand Down

0 comments on commit 1ca495c

Please sign in to comment.