diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index cece87e4b3b10..7514d92c1934c 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -938,6 +938,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = .filter(not_(DM.is_paused)) .filter(TI.state == State.SCHEDULED) .options(selectinload('dag_model')) + .order_by(-TI.priority_weight, TI.execution_date) ) starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0] if starved_pools: diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index faf19d9b096ea..36bc1b584a191 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1177,6 +1177,165 @@ def test_find_executable_task_instances_pool(self): assert tis[3].key in res_keys session.rollback() + def test_find_executable_task_instances_order_execution_date(self): + dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a' + dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b' + task_id = 'task-a' + dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16) + dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16) + dag1_task = DummyOperator(dag=dag_1, task_id=task_id) + dag2_task = DummyOperator(dag=dag_2, task_id=task_id) + dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1)) + dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2)) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + session = settings.Session() + + dag_model_1 = DagModel( + dag_id=dag_id_1, + is_paused=False, + concurrency=dag_1.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model_1) + dag_model_2 = DagModel( + dag_id=dag_id_2, + is_paused=False, + concurrency=dag_2.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model_2) + dr1 = dag_1.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE + timedelta(hours=1), + state=State.RUNNING, + ) + dr2 = dag_2.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + state=State.RUNNING, + ) + + tis = [ + TaskInstance(dag1_task, dr1.execution_date), + TaskInstance(dag2_task, dr2.execution_date), + ] + for ti in tis: + ti.state = State.SCHEDULED + session.merge(ti) + session.flush() + + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session) + session.flush() + assert [ti.key for ti in res] == [tis[1].key] + session.rollback() + + def test_find_executable_task_instances_order_priority(self): + dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-a' + dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-b' + task_id = 'task-a' + dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16) + dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16) + dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority_weight=1) + dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority_weight=4) + dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1)) + dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2)) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + session = settings.Session() + + dag_model_1 = DagModel( + dag_id=dag_id_1, + is_paused=False, + concurrency=dag_1.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model_1) + dag_model_2 = DagModel( + dag_id=dag_id_2, + is_paused=False, + concurrency=dag_2.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model_2) + dr1 = dag_1.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + state=State.RUNNING, + ) + dr2 = dag_2.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + state=State.RUNNING, + ) + + tis = [ + TaskInstance(dag1_task, dr1.execution_date), + TaskInstance(dag2_task, dr2.execution_date), + ] + for ti in tis: + ti.state = State.SCHEDULED + session.merge(ti) + session.flush() + + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session) + session.flush() + assert [ti.key for ti in res] == [tis[1].key] + session.rollback() + + def test_find_executable_task_instances_order_execution_date_and_priority(self): + dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-a' + dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-b' + task_id = 'task-a' + dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16) + dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16) + dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority_weight=1) + dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority_weight=4) + dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1)) + dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2)) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + session = settings.Session() + + dag_model_1 = DagModel( + dag_id=dag_id_1, + is_paused=False, + concurrency=dag_1.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model_1) + dag_model_2 = DagModel( + dag_id=dag_id_2, + is_paused=False, + concurrency=dag_2.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model_2) + dr1 = dag_1.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + state=State.RUNNING, + ) + dr2 = dag_2.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE + timedelta(hours=1), + state=State.RUNNING, + ) + + tis = [ + TaskInstance(dag1_task, dr1.execution_date), + TaskInstance(dag2_task, dr2.execution_date), + ] + for ti in tis: + ti.state = State.SCHEDULED + session.merge(ti) + session.flush() + + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session) + session.flush() + assert [ti.key for ti in res] == [tis[1].key] + session.rollback() + def test_find_executable_task_instances_in_default_pool(self): set_default_pool_slots(1)