Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tasks in an infinite slots pool were never scheduled #15247

Merged
merged 2 commits into from Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion airflow/models/pool.py
Expand Up @@ -106,6 +106,8 @@ def slots_stats(

pool_rows: Iterable[Tuple[str, int]] = query.all()
for (pool_name, total_slots) in pool_rows:
if total_slots == -1:
total_slots = float('inf') # type: ignore
pools[pool_name] = PoolStats(total=total_slots, running=0, queued=0, open=0)

state_count_by_pool = (
Expand All @@ -115,8 +117,10 @@ def slots_stats(
).all()

# calculate queued and running metrics
count: int
for (pool_name, state, count) in state_count_by_pool:
# Some databases return decimal.Decimal here.
count = int(count)

stats_dict: Optional[PoolStats] = pools.get(pool_name)
if not stats_dict:
continue
Expand Down
35 changes: 35 additions & 0 deletions tests/jobs/test_scheduler_job.py
Expand Up @@ -1423,6 +1423,41 @@ def test_nonexistent_pool(self):
assert 0 == len(res)
session.rollback()

def test_infinite_pool(self):
dag_id = 'SchedulerJobTest.test_infinite_pool'
task_id = 'dummy'
dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16)
task = DummyOperator(dag=dag, task_id=task_id, pool="infinite_pool")
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))

self.scheduler_job = SchedulerJob(subdir=os.devnull)
session = settings.Session()

dag_model = DagModel(
dag_id=dag_id,
is_paused=False,
concurrency=dag.concurrency,
has_task_concurrency_limits=False,
)
session.add(dag_model)
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)

ti = TaskInstance(task, dr.execution_date)
ti.state = State.SCHEDULED
session.merge(ti)
infinite_pool = Pool(pool='infinite_pool', slots=-1, description='infinite pool')
session.add(infinite_pool)
session.commit()

res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
session.flush()
assert 1 == len(res)
session.rollback()

def test_find_executable_task_instances_none(self):
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_none'
task_id_1 = 'dummy'
Expand Down
4 changes: 2 additions & 2 deletions tests/models/test_pool.py
Expand Up @@ -110,10 +110,10 @@ def test_infinite_slots(self):
"running": 0,
},
"test_pool": {
"open": -1,
"open": float('inf'),
"queued": 1,
"running": 1,
"total": -1,
"total": float('inf'),
},
} == pool.slots_stats()

Expand Down