Skip to content

Commit

Permalink
Queue tasks with higher priority and earlier execution_date first. (#…
Browse files Browse the repository at this point in the history
…15210)

Co-authored-by: Ginevra Gaudioso <ggaudioso@vectra.ai>
Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
  • Loading branch information
3 people committed Jun 14, 2021
1 parent 4752fb3 commit 943292b
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/jobs/scheduler_job.py
Expand Up @@ -939,6 +939,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
.filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
.options(selectinload('dag_model'))
.order_by(-TI.priority_weight, TI.execution_date)
)
starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
if starved_pools:
Expand Down
159 changes: 159 additions & 0 deletions tests/jobs/test_scheduler_job.py
Expand Up @@ -1177,6 +1177,165 @@ def test_find_executable_task_instances_pool(self):
assert tis[3].key in res_keys
session.rollback()

def test_find_executable_task_instances_order_execution_date(self):
dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
task_id = 'task-a'
dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))

self.scheduler_job = SchedulerJob(subdir=os.devnull)
session = settings.Session()

dag_model_1 = DagModel(
dag_id=dag_id_1,
is_paused=False,
concurrency=dag_1.concurrency,
has_task_concurrency_limits=False,
)
session.add(dag_model_1)
dag_model_2 = DagModel(
dag_id=dag_id_2,
is_paused=False,
concurrency=dag_2.concurrency,
has_task_concurrency_limits=False,
)
session.add(dag_model_2)
dr1 = dag_1.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + timedelta(hours=1),
state=State.RUNNING,
)
dr2 = dag_2.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)

tis = [
TaskInstance(dag1_task, dr1.execution_date),
TaskInstance(dag2_task, dr2.execution_date),
]
for ti in tis:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()

res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
session.flush()
assert [ti.key for ti in res] == [tis[1].key]
session.rollback()

def test_find_executable_task_instances_order_priority(self):
dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-a'
dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-b'
task_id = 'task-a'
dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority_weight=1)
dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority_weight=4)
dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))

self.scheduler_job = SchedulerJob(subdir=os.devnull)
session = settings.Session()

dag_model_1 = DagModel(
dag_id=dag_id_1,
is_paused=False,
concurrency=dag_1.concurrency,
has_task_concurrency_limits=False,
)
session.add(dag_model_1)
dag_model_2 = DagModel(
dag_id=dag_id_2,
is_paused=False,
concurrency=dag_2.concurrency,
has_task_concurrency_limits=False,
)
session.add(dag_model_2)
dr1 = dag_1.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)
dr2 = dag_2.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)

tis = [
TaskInstance(dag1_task, dr1.execution_date),
TaskInstance(dag2_task, dr2.execution_date),
]
for ti in tis:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()

res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
session.flush()
assert [ti.key for ti in res] == [tis[1].key]
session.rollback()

def test_find_executable_task_instances_order_execution_date_and_priority(self):
dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-a'
dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-b'
task_id = 'task-a'
dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority_weight=1)
dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority_weight=4)
dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))

self.scheduler_job = SchedulerJob(subdir=os.devnull)
session = settings.Session()

dag_model_1 = DagModel(
dag_id=dag_id_1,
is_paused=False,
concurrency=dag_1.concurrency,
has_task_concurrency_limits=False,
)
session.add(dag_model_1)
dag_model_2 = DagModel(
dag_id=dag_id_2,
is_paused=False,
concurrency=dag_2.concurrency,
has_task_concurrency_limits=False,
)
session.add(dag_model_2)
dr1 = dag_1.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)
dr2 = dag_2.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + timedelta(hours=1),
state=State.RUNNING,
)

tis = [
TaskInstance(dag1_task, dr1.execution_date),
TaskInstance(dag2_task, dr2.execution_date),
]
for ti in tis:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()

res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session)
session.flush()
assert [ti.key for ti in res] == [tis[1].key]
session.rollback()

def test_find_executable_task_instances_in_default_pool(self):
set_default_pool_slots(1)

Expand Down

0 comments on commit 943292b

Please sign in to comment.