diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 0a93d07ffb992..22ba5decb4111 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -553,9 +553,9 @@ def _enqueue_task_instances_with_queued_state( queue=queue, ) - def _critical_section_execute_task_instances(self, session: Session) -> int: + def _critical_section_enqueue_task_instances(self, session: Session) -> int: """ - Attempts to execute TaskInstances that should be executed by the scheduler. + Enqueues TaskInstances for execution. There are three steps: 1. Pick TIs by priority with the constraint that they are in the expected states @@ -910,7 +910,7 @@ def _do_scheduling(self, session) -> int: - Then, via a Critical Section (locking the rows of the Pool model) we queue tasks, and then send them to the executor. - See docs of _critical_section_execute_task_instances for more. + See docs of _critical_section_enqueue_task_instances for more. :return: Number of TIs enqueued in this iteration :rtype: int @@ -958,7 +958,7 @@ def _do_scheduling(self, session) -> int: timer.start() # Find anything TIs in state SCHEDULED, try to QUEUE it (send it to the executor) - num_queued_tis = self._critical_section_execute_task_instances(session=session) + num_queued_tis = self._critical_section_enqueue_task_instances(session=session) # Make sure we only sent this metric if we obtained the lock, otherwise we'll skew the # metric, way down diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index a4fe6a0b3e4d8..b50c8bfd7fde0 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -395,7 +395,7 @@ def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker) (ti1,) = dr1.task_instances ti1.state = State.SCHEDULED - self.scheduler_job._critical_section_execute_task_instances(session) + self.scheduler_job._critical_section_enqueue_task_instances(session) ti1.refresh_from_db(session=session) assert State.SCHEDULED == ti1.state session.rollback() @@ -423,7 +423,7 @@ def test_execute_task_instances_backfill_tasks_wont_execute(self, dag_maker): assert dr1.is_backfill - self.scheduler_job._critical_section_execute_task_instances(session) + self.scheduler_job._critical_section_enqueue_task_instances(session) session.flush() ti1.refresh_from_db() assert State.SCHEDULED == ti1.state @@ -1258,7 +1258,7 @@ def test_enqueue_task_instances_sets_ti_state_to_None_if_dagrun_in_finish_state( assert ti.state == State.NONE mock_queue_command.assert_not_called() - def test_critical_section_execute_task_instances(self, dag_maker): + def test_critical_section_enqueue_task_instances(self, dag_maker): dag_id = 'SchedulerJobTest.test_execute_task_instances' task_id_1 = 'dummy_task' task_id_2 = 'dummy_task_nonexistent_queue' @@ -1297,7 +1297,7 @@ def test_critical_section_execute_task_instances(self, dag_maker): assert State.RUNNING == dr2.state - res = self.scheduler_job._critical_section_execute_task_instances(session) + res = self.scheduler_job._critical_section_enqueue_task_instances(session) # check that max_active_tasks is respected ti1.refresh_from_db() @@ -1346,7 +1346,7 @@ def _create_dagruns(): ti2.state = State.SCHEDULED session.flush() self.scheduler_job.max_tis_per_query = 2 - res = self.scheduler_job._critical_section_execute_task_instances(session) + res = self.scheduler_job._critical_section_enqueue_task_instances(session) assert 2 == res self.scheduler_job.max_tis_per_query = 8 @@ -1356,9 +1356,9 @@ def _create_dagruns(): mock_slots.return_value = 2 # Check that we don't "overfill" the executor assert 2 == res - res = self.scheduler_job._critical_section_execute_task_instances(session) + res = self.scheduler_job._critical_section_enqueue_task_instances(session) - res = self.scheduler_job._critical_section_execute_task_instances(session) + res = self.scheduler_job._critical_section_enqueue_task_instances(session) assert 4 == res for ti in tis: ti.refresh_from_db() @@ -1398,7 +1398,7 @@ def _create_dagruns(): self.scheduler_job.max_tis_per_query = 0 self.scheduler_job.executor = MagicMock(slots_available=36) - res = self.scheduler_job._critical_section_execute_task_instances(session) + res = self.scheduler_job._critical_section_enqueue_task_instances(session) # 20 dag runs * 2 tasks each = 40, but limited by number of slots available assert res == 36 session.rollback()