Skip to content

Commit

Permalink
clearer method name in scheduler_job.py (#23702)
Browse files Browse the repository at this point in the history
  • Loading branch information
RNHTTR committed May 18, 2022
1 parent 3849ebb commit 5d743e7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
8 changes: 4 additions & 4 deletions airflow/jobs/scheduler_job.py
Expand Up @@ -553,9 +553,9 @@ def _enqueue_task_instances_with_queued_state(
queue=queue,
)

def _critical_section_execute_task_instances(self, session: Session) -> int:
def _critical_section_enqueue_task_instances(self, session: Session) -> int:
"""
Attempts to execute TaskInstances that should be executed by the scheduler.
Enqueues TaskInstances for execution.
There are three steps:
1. Pick TIs by priority with the constraint that they are in the expected states
Expand Down Expand Up @@ -910,7 +910,7 @@ def _do_scheduling(self, session) -> int:
- Then, via a Critical Section (locking the rows of the Pool model) we queue tasks, and then send them
to the executor.
See docs of _critical_section_execute_task_instances for more.
See docs of _critical_section_enqueue_task_instances for more.
:return: Number of TIs enqueued in this iteration
:rtype: int
Expand Down Expand Up @@ -958,7 +958,7 @@ def _do_scheduling(self, session) -> int:
timer.start()

# Find anything TIs in state SCHEDULED, try to QUEUE it (send it to the executor)
num_queued_tis = self._critical_section_execute_task_instances(session=session)
num_queued_tis = self._critical_section_enqueue_task_instances(session=session)

# Make sure we only sent this metric if we obtained the lock, otherwise we'll skew the
# metric, way down
Expand Down
16 changes: 8 additions & 8 deletions tests/jobs/test_scheduler_job.py
Expand Up @@ -395,7 +395,7 @@ def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker)
(ti1,) = dr1.task_instances
ti1.state = State.SCHEDULED

self.scheduler_job._critical_section_execute_task_instances(session)
self.scheduler_job._critical_section_enqueue_task_instances(session)
ti1.refresh_from_db(session=session)
assert State.SCHEDULED == ti1.state
session.rollback()
Expand Down Expand Up @@ -423,7 +423,7 @@ def test_execute_task_instances_backfill_tasks_wont_execute(self, dag_maker):

assert dr1.is_backfill

self.scheduler_job._critical_section_execute_task_instances(session)
self.scheduler_job._critical_section_enqueue_task_instances(session)
session.flush()
ti1.refresh_from_db()
assert State.SCHEDULED == ti1.state
Expand Down Expand Up @@ -1258,7 +1258,7 @@ def test_enqueue_task_instances_sets_ti_state_to_None_if_dagrun_in_finish_state(
assert ti.state == State.NONE
mock_queue_command.assert_not_called()

def test_critical_section_execute_task_instances(self, dag_maker):
def test_critical_section_enqueue_task_instances(self, dag_maker):
dag_id = 'SchedulerJobTest.test_execute_task_instances'
task_id_1 = 'dummy_task'
task_id_2 = 'dummy_task_nonexistent_queue'
Expand Down Expand Up @@ -1297,7 +1297,7 @@ def test_critical_section_execute_task_instances(self, dag_maker):

assert State.RUNNING == dr2.state

res = self.scheduler_job._critical_section_execute_task_instances(session)
res = self.scheduler_job._critical_section_enqueue_task_instances(session)

# check that max_active_tasks is respected
ti1.refresh_from_db()
Expand Down Expand Up @@ -1346,7 +1346,7 @@ def _create_dagruns():
ti2.state = State.SCHEDULED
session.flush()
self.scheduler_job.max_tis_per_query = 2
res = self.scheduler_job._critical_section_execute_task_instances(session)
res = self.scheduler_job._critical_section_enqueue_task_instances(session)
assert 2 == res

self.scheduler_job.max_tis_per_query = 8
Expand All @@ -1356,9 +1356,9 @@ def _create_dagruns():
mock_slots.return_value = 2
# Check that we don't "overfill" the executor
assert 2 == res
res = self.scheduler_job._critical_section_execute_task_instances(session)
res = self.scheduler_job._critical_section_enqueue_task_instances(session)

res = self.scheduler_job._critical_section_execute_task_instances(session)
res = self.scheduler_job._critical_section_enqueue_task_instances(session)
assert 4 == res
for ti in tis:
ti.refresh_from_db()
Expand Down Expand Up @@ -1398,7 +1398,7 @@ def _create_dagruns():
self.scheduler_job.max_tis_per_query = 0
self.scheduler_job.executor = MagicMock(slots_available=36)

res = self.scheduler_job._critical_section_execute_task_instances(session)
res = self.scheduler_job._critical_section_enqueue_task_instances(session)
# 20 dag runs * 2 tasks each = 40, but limited by number of slots available
assert res == 36
session.rollback()
Expand Down

0 comments on commit 5d743e7

Please sign in to comment.