From e5a95b8de2b5af75f48c7a4590ae21a806506afc Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 9 Jun 2021 01:01:58 +0100 Subject: [PATCH 01/15] Fix DAG run state not updated while DAG is paused The state of a DAG run does not update while the DAG is paused. The tasks continue to run if the DAG run was kicked off before the DAG was paused and eventually finish and are marked correctly. The DAG run state does not get updated and stays in Running state until the DAG is unpaused. This change fixes it by running a check at intervals, updating states(if possible) of DagRuns that the tasks have finished running while the DAG is paused --- airflow/jobs/scheduler_job.py | 15 +++++++++++++++ airflow/models/dagrun.py | 21 +++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 3b7e842f93da9..a94d54e1d8e41 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1344,6 +1344,10 @@ def _run_scheduler_loop(self) -> None: conf.getfloat('scheduler', 'clean_tis_without_dagrun_interval', fallback=15.0), self._clean_tis_without_dagrun, ) + timers.call_regular_interval( + conf.getfloat('scheduler', 'update_dagrun_state_for_paused_dag_interval', fallback=15.0), + self._update_dagrun_state_for_paused_dag, + ) for loop_count in itertools.count(start=1): with Stats.timer() as timer: @@ -1418,6 +1422,17 @@ def _clean_tis_without_dagrun(self, session): raise guard.commit() + @provide_session + def _update_dagrun_state_for_paused_dag(self, session=None): + """ + Checks for paused dags with DagRuns in the running state and + update the DagRun state if possible + """ + dag_runs = DR.get_running_dagruns_in_paused_dags(session=session) + for dagrun in dag_runs: + dagrun.dag = self.dagbag.get_dag(dagrun.dag_id) # expensive. Any idea? + dagrun.update_state(session=session, execute_callbacks=False) + def _do_scheduling(self, session) -> int: """ This function is where the main scheduling decisions take places. It: diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index e18351d3c1fb8..b39e5a8481f3e 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -183,6 +183,27 @@ def refresh_from_db(self, session: Session = None): self.id = dr.id self.state = dr.state + @classmethod + def get_running_dagruns_in_paused_dags(cls, session: Session, max_number: Optional[int] = None): + """Returns the DagRuns that are in running state but the DAG is paused""" + from airflow.models import DagModel + + if max_number is None: + max_number = cls.DEFAULT_DAGRUNS_TO_EXAMINE + + query = ( + session.query(cls) + .filter(cls.state == State.RUNNING, cls.run_type != DagRunType.BACKFILL_JOB) + .join( + DagModel, + DagModel.dag_id == cls.dag_id, + ) + .filter(DagModel.is_paused == expression.true(), DagModel.is_active == expression.true()) + ) + return with_row_locks( + query.limit(max_number), of=cls, session=session, **skip_locked(session=session) + ) + @classmethod def next_dagruns_to_examine( cls, From c21f1609a1b6c347aa61d09574bdef3c4f2fe3a8 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 9 Jun 2021 09:51:19 +0100 Subject: [PATCH 02/15] fixup! Fix DAG run state not updated while DAG is paused --- airflow/config_templates/config.yml | 8 +++ airflow/config_templates/default_airflow.cfg | 4 ++ airflow/jobs/scheduler_job.py | 2 +- tests/jobs/test_scheduler_job.py | 51 ++++++++++++++++++++ tests/models/test_dagrun.py | 39 +++++++++++++++ 5 files changed, 103 insertions(+), 1 deletion(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 3e083d4b90b70..4a0296295bf94 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1709,6 +1709,14 @@ type: float example: ~ default: "15.0" + - name: update_dagrun_state_for_paused_dag_interval + description: | + How often (in seconds) to check paused DAGs for DagRuns in running state + and update states of DagRuns whose tasks finished the DagRun + version_added: 2.1.1 + type: float + example: ~ + default: "30.0" - name: scheduler_heartbeat_sec description: | The scheduler constantly tries to trigger new tasks (look at the diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index c03e116746f96..a886b6ed96fc1 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -854,6 +854,10 @@ job_heartbeat_sec = 5 # that no longer have a matching DagRun clean_tis_without_dagrun_interval = 15.0 +# How often (in seconds) to check paused DAGs for DagRuns in running state +# and update states of DagRuns whose tasks finished the DagRun +update_dagrun_state_for_paused_dag_interval = 30.0 + # The scheduler constantly tries to trigger new tasks (look at the # scheduler section in the docs for more information). This defines # how often the scheduler should run (in seconds). diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index a94d54e1d8e41..f647b39b34204 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1345,7 +1345,7 @@ def _run_scheduler_loop(self) -> None: self._clean_tis_without_dagrun, ) timers.call_regular_interval( - conf.getfloat('scheduler', 'update_dagrun_state_for_paused_dag_interval', fallback=15.0), + conf.getfloat('scheduler', 'update_dagrun_state_for_paused_dag_interval', fallback=30.0), self._update_dagrun_state_for_paused_dag, ) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 15dac7358efb7..bdecb56f1c1ab 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2124,6 +2124,57 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun( assert ti.start_date == ti.end_date assert ti.duration is not None + def test_scheduler_loop_should_update_state_of_finished_dagruns_with_dag_paused(self): + """Test that with DAG paused, DagRun state will update when the tasks finishes the run""" + dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE) + op1 = DummyOperator(task_id='dummy', dag=dag, owner='airflow') + + session = settings.Session() + orm_dag = DagModel( + dag_id=dag.dag_id, + has_task_concurrency_limits=False, + next_dagrun=dag.start_date, + next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE), + is_active=True, + is_paused=True, + ) + session.add(orm_dag) + session.flush() + # Write Dag to DB + dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False) + dagbag.bag_dag(dag, root_dag=dag) + dagbag.sync_to_db() + + dr = dag.create_dagrun( + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session, + ) + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + ti.state = State.SUCCESS + session.commit() + assert dr.state == State.RUNNING + # This poll interval is large, bug the scheduler doesn't sleep that + # long, instead we hit the update_dagrun_state_for_paused_dag_interval instead + self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30) + self.scheduler_job.dagbag = dagbag + executor = MockExecutor(do_update=False) + executor.queued_tasks + self.scheduler_job.executor = executor + processor = mock.MagicMock() + processor.done = False + self.scheduler_job.processor_agent = processor + + with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars( + {('scheduler', 'update_dagrun_state_for_paused_dag_interval'): '0.001'} + ): + self.scheduler_job._run_scheduler_loop() + + dr = session.query(DagRun).filter(DagRun.id == dr.id).first() + assert dr.state == State.SUCCESS + @mock.patch('airflow.jobs.scheduler_job.DagFileProcessorAgent') def test_executor_end_called(self, mock_processor_agent): """ diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index 78991999ffde1..90b0ba6590b49 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -731,6 +731,45 @@ def test_next_dagruns_to_examine_only_unpaused(self): session.rollback() session.close() + def test_get_running_dagruns_in_paused_dags_only_paused(self): + """ + Check that "get_running_dagruns_in_paused_dags" ignores runs from unpaused/inactive DAGs + """ + dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE) + DummyOperator(task_id='dummy', dag=dag, owner='airflow') + + session = settings.Session() + orm_dag = DagModel( + dag_id=dag.dag_id, + has_task_concurrency_limits=False, + next_dagrun=dag.start_date, + next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE), + is_active=True, + is_paused=True, + ) + session.add(orm_dag) + session.flush() + dr = dag.create_dagrun( + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session, + ) + + runs = DagRun.get_running_dagruns_in_paused_dags(session).all() + + assert runs == [dr] + + orm_dag.is_paused = False + session.flush() + + runs = DagRun.get_running_dagruns_in_paused_dags(session).all() + assert runs == [] + + session.rollback() + session.close() + @mock.patch.object(Stats, 'timing') def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock): """ From 2dc04f6ee72298336f8f7b8c2a8a1763eae759a2 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 10 Jun 2021 18:53:11 +0100 Subject: [PATCH 03/15] Move implementation to task exit --- airflow/jobs/local_task_job.py | 12 ++++++++ airflow/jobs/scheduler_job.py | 16 ---------- tests/jobs/test_local_task_job.py | 42 +++++++++++++++++++++++++ tests/jobs/test_scheduler_job.py | 51 ------------------------------- 4 files changed, 54 insertions(+), 67 deletions(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index 35d0422e7d4b1..1e4222b85e47e 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -167,6 +167,7 @@ def handle_task_exit(self, return_code: int) -> None: if not self.task_instance.test_mode: if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True): self._run_mini_scheduler_on_child_tasks() + self._update_dagrun_state_for_paused_dag() def on_kill(self): self.task_runner.terminate() @@ -264,3 +265,14 @@ def _run_mini_scheduler_on_child_tasks(self, session=None) -> None: exc_info=True, ) session.rollback() + + @provide_session + def _update_dagrun_state_for_paused_dag(self, session=None): + """ + Checks for paused dags with DagRuns in the running state and + update the DagRun state if possible + """ + dag_runs = DagRun.get_running_dagruns_in_paused_dags(session=session) + for dagrun in dag_runs: + dagrun.dag = self.task_instance.task.dag + dagrun.update_state(session=session, execute_callbacks=False) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index f647b39b34204..b1450c460d825 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1344,11 +1344,6 @@ def _run_scheduler_loop(self) -> None: conf.getfloat('scheduler', 'clean_tis_without_dagrun_interval', fallback=15.0), self._clean_tis_without_dagrun, ) - timers.call_regular_interval( - conf.getfloat('scheduler', 'update_dagrun_state_for_paused_dag_interval', fallback=30.0), - self._update_dagrun_state_for_paused_dag, - ) - for loop_count in itertools.count(start=1): with Stats.timer() as timer: @@ -1422,17 +1417,6 @@ def _clean_tis_without_dagrun(self, session): raise guard.commit() - @provide_session - def _update_dagrun_state_for_paused_dag(self, session=None): - """ - Checks for paused dags with DagRuns in the running state and - update the DagRun state if possible - """ - dag_runs = DR.get_running_dagruns_in_paused_dags(session=session) - for dagrun in dag_runs: - dagrun.dag = self.dagbag.get_dag(dagrun.dag_id) # expensive. Any idea? - dagrun.update_state(session=session, execute_callbacks=False) - def _do_scheduling(self, session) -> int: """ This function is where the main scheduling decisions take places. It: diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 905d83ae569e2..e23ac97af378a 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -36,6 +36,7 @@ from airflow.jobs.scheduler_job import SchedulerJob from airflow.models.dag import DAG, DagModel from airflow.models.dagbag import DagBag +from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator @@ -45,6 +46,7 @@ from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.timeout import timeout +from airflow.utils.types import DagRunType from tests.test_utils import db from tests.test_utils.asserts import assert_queries_count from tests.test_utils.config import conf_vars @@ -686,6 +688,46 @@ def test_fast_follow( if scheduler_job.processor_agent: scheduler_job.processor_agent.end() + def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self): + """Test that with DAG paused, DagRun state will update when the tasks finishes the run""" + dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE) + op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True) + + session = settings.Session() + orm_dag = DagModel( + dag_id=dag.dag_id, + has_task_concurrency_limits=False, + next_dagrun=dag.start_date, + next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE), + is_active=True, + is_paused=True, + ) + session.add(orm_dag) + session.flush() + # Write Dag to DB + dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False) + dagbag.bag_dag(dag, root_dag=dag) + dagbag.sync_to_db() + + dr = dag.create_dagrun( + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session, + ) + ti = TaskInstance(op1, dr.execution_date) + ti.refresh_from_db() + ti.state = State.SUCCESS + session.commit() + assert dr.state == State.RUNNING + + job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) + job1.task_runner = StandardTaskRunner(job1) + job1.run() + dr = session.query(DagRun).filter(DagRun.id == dr.id).first() + assert dr.state == State.SUCCESS + @pytest.fixture() def clean_db_helper(): diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index bdecb56f1c1ab..15dac7358efb7 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2124,57 +2124,6 @@ def test_scheduler_loop_should_change_state_for_tis_without_dagrun( assert ti.start_date == ti.end_date assert ti.duration is not None - def test_scheduler_loop_should_update_state_of_finished_dagruns_with_dag_paused(self): - """Test that with DAG paused, DagRun state will update when the tasks finishes the run""" - dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE) - op1 = DummyOperator(task_id='dummy', dag=dag, owner='airflow') - - session = settings.Session() - orm_dag = DagModel( - dag_id=dag.dag_id, - has_task_concurrency_limits=False, - next_dagrun=dag.start_date, - next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE), - is_active=True, - is_paused=True, - ) - session.add(orm_dag) - session.flush() - # Write Dag to DB - dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False) - dagbag.bag_dag(dag, root_dag=dag) - dagbag.sync_to_db() - - dr = dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - state=State.RUNNING, - execution_date=DEFAULT_DATE, - start_date=DEFAULT_DATE, - session=session, - ) - ti = dr.get_task_instance(task_id=op1.task_id, session=session) - ti.state = State.SUCCESS - session.commit() - assert dr.state == State.RUNNING - # This poll interval is large, bug the scheduler doesn't sleep that - # long, instead we hit the update_dagrun_state_for_paused_dag_interval instead - self.scheduler_job = SchedulerJob(num_runs=2, processor_poll_interval=30) - self.scheduler_job.dagbag = dagbag - executor = MockExecutor(do_update=False) - executor.queued_tasks - self.scheduler_job.executor = executor - processor = mock.MagicMock() - processor.done = False - self.scheduler_job.processor_agent = processor - - with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars( - {('scheduler', 'update_dagrun_state_for_paused_dag_interval'): '0.001'} - ): - self.scheduler_job._run_scheduler_loop() - - dr = session.query(DagRun).filter(DagRun.id == dr.id).first() - assert dr.state == State.SUCCESS - @mock.patch('airflow.jobs.scheduler_job.DagFileProcessorAgent') def test_executor_end_called(self, mock_processor_agent): """ From 9aaaaa0c52ac55a66a9ee4c28aefd18a5351b96c Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 10 Jun 2021 19:06:56 +0100 Subject: [PATCH 04/15] fixup! Move implementation to task exit --- airflow/config_templates/config.yml | 8 -------- airflow/config_templates/default_airflow.cfg | 4 ---- 2 files changed, 12 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 4a0296295bf94..3e083d4b90b70 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1709,14 +1709,6 @@ type: float example: ~ default: "15.0" - - name: update_dagrun_state_for_paused_dag_interval - description: | - How often (in seconds) to check paused DAGs for DagRuns in running state - and update states of DagRuns whose tasks finished the DagRun - version_added: 2.1.1 - type: float - example: ~ - default: "30.0" - name: scheduler_heartbeat_sec description: | The scheduler constantly tries to trigger new tasks (look at the diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index a886b6ed96fc1..c03e116746f96 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -854,10 +854,6 @@ job_heartbeat_sec = 5 # that no longer have a matching DagRun clean_tis_without_dagrun_interval = 15.0 -# How often (in seconds) to check paused DAGs for DagRuns in running state -# and update states of DagRuns whose tasks finished the DagRun -update_dagrun_state_for_paused_dag_interval = 30.0 - # The scheduler constantly tries to trigger new tasks (look at the # scheduler section in the docs for more information). This defines # how often the scheduler should run (in seconds). From a2038aa937c8df2005ef4614c18bffe8da240849 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 10 Jun 2021 19:33:08 +0100 Subject: [PATCH 05/15] fixup! fixup! Move implementation to task exit --- airflow/jobs/local_task_job.py | 12 +++++------ airflow/models/dagrun.py | 21 ------------------ tests/models/test_dagrun.py | 39 ---------------------------------- 3 files changed, 6 insertions(+), 66 deletions(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index 1e4222b85e47e..31c1e1e23706c 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -167,7 +167,7 @@ def handle_task_exit(self, return_code: int) -> None: if not self.task_instance.test_mode: if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True): self._run_mini_scheduler_on_child_tasks() - self._update_dagrun_state_for_paused_dag() + self._update_dagrun_state_for_paused_dag(self.task_instance) def on_kill(self): self.task_runner.terminate() @@ -267,12 +267,12 @@ def _run_mini_scheduler_on_child_tasks(self, session=None) -> None: session.rollback() @provide_session - def _update_dagrun_state_for_paused_dag(self, session=None): + def _update_dagrun_state_for_paused_dag(self, task_instance, session=None): """ Checks for paused dags with DagRuns in the running state and update the DagRun state if possible """ - dag_runs = DagRun.get_running_dagruns_in_paused_dags(session=session) - for dagrun in dag_runs: - dagrun.dag = self.task_instance.task.dag - dagrun.update_state(session=session, execute_callbacks=False) + dag_run = task_instance.get_dagrun() + dag = dag_run.dag = task_instance.task.dag + if dag.get_is_paused(): + dag_run.update_state(session=session, execute_callbacks=False) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index b39e5a8481f3e..e18351d3c1fb8 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -183,27 +183,6 @@ def refresh_from_db(self, session: Session = None): self.id = dr.id self.state = dr.state - @classmethod - def get_running_dagruns_in_paused_dags(cls, session: Session, max_number: Optional[int] = None): - """Returns the DagRuns that are in running state but the DAG is paused""" - from airflow.models import DagModel - - if max_number is None: - max_number = cls.DEFAULT_DAGRUNS_TO_EXAMINE - - query = ( - session.query(cls) - .filter(cls.state == State.RUNNING, cls.run_type != DagRunType.BACKFILL_JOB) - .join( - DagModel, - DagModel.dag_id == cls.dag_id, - ) - .filter(DagModel.is_paused == expression.true(), DagModel.is_active == expression.true()) - ) - return with_row_locks( - query.limit(max_number), of=cls, session=session, **skip_locked(session=session) - ) - @classmethod def next_dagruns_to_examine( cls, diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index 90b0ba6590b49..78991999ffde1 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -731,45 +731,6 @@ def test_next_dagruns_to_examine_only_unpaused(self): session.rollback() session.close() - def test_get_running_dagruns_in_paused_dags_only_paused(self): - """ - Check that "get_running_dagruns_in_paused_dags" ignores runs from unpaused/inactive DAGs - """ - dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE) - DummyOperator(task_id='dummy', dag=dag, owner='airflow') - - session = settings.Session() - orm_dag = DagModel( - dag_id=dag.dag_id, - has_task_concurrency_limits=False, - next_dagrun=dag.start_date, - next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE), - is_active=True, - is_paused=True, - ) - session.add(orm_dag) - session.flush() - dr = dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - state=State.RUNNING, - execution_date=DEFAULT_DATE, - start_date=DEFAULT_DATE, - session=session, - ) - - runs = DagRun.get_running_dagruns_in_paused_dags(session).all() - - assert runs == [dr] - - orm_dag.is_paused = False - session.flush() - - runs = DagRun.get_running_dagruns_in_paused_dags(session).all() - assert runs == [] - - session.rollback() - session.close() - @mock.patch.object(Stats, 'timing') def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock): """ From 7838c553788ae4ae6f7451d7c51f78c7218427f6 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 10 Jun 2021 23:10:05 +0100 Subject: [PATCH 06/15] fixup! fixup! fixup! Move implementation to task exit --- airflow/jobs/local_task_job.py | 8 ++++---- tests/jobs/test_local_task_job.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index 31c1e1e23706c..ca42a87f8096a 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -167,7 +167,7 @@ def handle_task_exit(self, return_code: int) -> None: if not self.task_instance.test_mode: if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True): self._run_mini_scheduler_on_child_tasks() - self._update_dagrun_state_for_paused_dag(self.task_instance) + self._update_dagrun_state_for_paused_dag() def on_kill(self): self.task_runner.terminate() @@ -267,12 +267,12 @@ def _run_mini_scheduler_on_child_tasks(self, session=None) -> None: session.rollback() @provide_session - def _update_dagrun_state_for_paused_dag(self, task_instance, session=None): + def _update_dagrun_state_for_paused_dag(self, session=None): """ Checks for paused dags with DagRuns in the running state and update the DagRun state if possible """ - dag_run = task_instance.get_dagrun() - dag = dag_run.dag = task_instance.task.dag + dag_run = self.task_instance.get_dagrun() + dag = dag_run.dag = self.task_instance.task.dag if dag.get_is_paused(): dag_run.update_state(session=session, execute_callbacks=False) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index e23ac97af378a..eedf08fd8d06e 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -746,12 +746,12 @@ def test_number_of_queries_single_loop(self, mock_get_task_runner, return_codes) task = DummyOperator(task_id='test_state_succeeded1', dag=dag) dag.clear() - dag.create_dagrun(run_id=unique_prefix, state=State.NONE) + dag.create_dagrun(run_id=unique_prefix, execution_date=DEFAULT_DATE, state=State.NONE) ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) mock_get_task_runner.return_value.return_code.side_effects = return_codes job = LocalTaskJob(task_instance=ti, executor=MockExecutor()) - with assert_queries_count(15): + with assert_queries_count(17): job.run() From fc4fbb7a0c131906b85f4da319de274937a329e7 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 11 Jun 2021 06:47:41 +0100 Subject: [PATCH 07/15] fixup! fixup! fixup! fixup! Move implementation to task exit --- tests/core/test_core.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/core/test_core.py b/tests/core/test_core.py index ee2df3c541767..4e25fe99421d0 100644 --- a/tests/core/test_core.py +++ b/tests/core/test_core.py @@ -299,6 +299,12 @@ def test_task_get_template(self): def test_local_task_job(self): TI = TaskInstance + self.dag_bash.create_dagrun( + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + ) ti = TI(task=self.runme_0, execution_date=DEFAULT_DATE) job = LocalTaskJob(task_instance=ti, ignore_ti_state=True) job.run() From b029f73f97f9f6062fa9c8187627f62474ded72d Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 11 Jun 2021 10:10:30 +0100 Subject: [PATCH 08/15] fixup! fixup! fixup! fixup! fixup! Move implementation to task exit --- airflow/jobs/local_task_job.py | 7 ++++--- tests/core/test_core.py | 6 ------ 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index ca42a87f8096a..e3c01e4276a2b 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -273,6 +273,7 @@ def _update_dagrun_state_for_paused_dag(self, session=None): update the DagRun state if possible """ dag_run = self.task_instance.get_dagrun() - dag = dag_run.dag = self.task_instance.task.dag - if dag.get_is_paused(): - dag_run.update_state(session=session, execute_callbacks=False) + if dag_run: + dag = dag_run.dag = self.task_instance.task.dag + if dag.get_is_paused(): + dag_run.update_state(session=session, execute_callbacks=False) diff --git a/tests/core/test_core.py b/tests/core/test_core.py index 4e25fe99421d0..ee2df3c541767 100644 --- a/tests/core/test_core.py +++ b/tests/core/test_core.py @@ -299,12 +299,6 @@ def test_task_get_template(self): def test_local_task_job(self): TI = TaskInstance - self.dag_bash.create_dagrun( - run_type=DagRunType.SCHEDULED, - state=State.RUNNING, - execution_date=DEFAULT_DATE, - start_date=DEFAULT_DATE, - ) ti = TI(task=self.runme_0, execution_date=DEFAULT_DATE) job = LocalTaskJob(task_instance=ti, ignore_ti_state=True) job.run() From c0100ced4b8f57fcba915fe3934d2e6c70bb5866 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 15 Jun 2021 09:27:45 +0100 Subject: [PATCH 09/15] Apply suggestions from code review Co-authored-by: Ash Berlin-Taylor --- airflow/jobs/local_task_job.py | 6 +++--- airflow/jobs/scheduler_job.py | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index e3c01e4276a2b..99d074354bf45 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -272,8 +272,8 @@ def _update_dagrun_state_for_paused_dag(self, session=None): Checks for paused dags with DagRuns in the running state and update the DagRun state if possible """ - dag_run = self.task_instance.get_dagrun() + dag_run = self.task_instance.get_dagrun(session=session) if dag_run: dag = dag_run.dag = self.task_instance.task.dag - if dag.get_is_paused(): - dag_run.update_state(session=session, execute_callbacks=False) + if dag.get_is_paused(session=session): + dag_run.update_state(session=session, execute_callbacks=True) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index b1450c460d825..3b7e842f93da9 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1344,6 +1344,7 @@ def _run_scheduler_loop(self) -> None: conf.getfloat('scheduler', 'clean_tis_without_dagrun_interval', fallback=15.0), self._clean_tis_without_dagrun, ) + for loop_count in itertools.count(start=1): with Stats.timer() as timer: From 6dd2978cd87017955d323f6f6ae5b9903d43a77b Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 15 Jun 2021 13:22:32 +0100 Subject: [PATCH 10/15] Update tests/jobs/test_local_task_job.py Co-authored-by: Ash Berlin-Taylor --- tests/jobs/test_local_task_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index eedf08fd8d06e..1f02a3d212bfe 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -725,7 +725,7 @@ def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self) job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) job1.task_runner = StandardTaskRunner(job1) job1.run() - dr = session.query(DagRun).filter(DagRun.id == dr.id).first() + session.refresh(dr) assert dr.state == State.SUCCESS From 2144c8c99f4ebe113f4da9be8e7d3fde21a5ea8e Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 14 Jun 2021 10:57:51 +0100 Subject: [PATCH 11/15] fixup! Update tests/jobs/test_local_task_job.py --- tests/jobs/test_local_task_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 1f02a3d212bfe..20a48a1b358bb 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -36,7 +36,6 @@ from airflow.jobs.scheduler_job import SchedulerJob from airflow.models.dag import DAG, DagModel from airflow.models.dagbag import DagBag -from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator @@ -725,6 +724,7 @@ def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self) job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) job1.task_runner = StandardTaskRunner(job1) job1.run() + session.add(dr) session.refresh(dr) assert dr.state == State.SUCCESS From 7ee7d9d1c38478ec193d7e26165ad001d044ace9 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 16 Jun 2021 09:46:36 +0100 Subject: [PATCH 12/15] Update tests/jobs/test_local_task_job.py Co-authored-by: Ash Berlin-Taylor --- tests/jobs/test_local_task_job.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 20a48a1b358bb..484dbd48cdbc6 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -715,9 +715,6 @@ def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self) start_date=DEFAULT_DATE, session=session, ) - ti = TaskInstance(op1, dr.execution_date) - ti.refresh_from_db() - ti.state = State.SUCCESS session.commit() assert dr.state == State.RUNNING From b4fce33af401956381d1ac98831e81c8a990b866 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 16 Jun 2021 09:47:14 +0100 Subject: [PATCH 13/15] Update tests/jobs/test_local_task_job.py --- tests/jobs/test_local_task_job.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 484dbd48cdbc6..633dd8582bf7a 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -715,7 +715,6 @@ def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self) start_date=DEFAULT_DATE, session=session, ) - session.commit() assert dr.state == State.RUNNING job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) From 11b0de59843506a30fd2cbd97e20a637f2ff7c0c Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 15 Jun 2021 03:13:20 +0100 Subject: [PATCH 14/15] Add back ti with no commit --- tests/jobs/test_local_task_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 633dd8582bf7a..4e86d73282edf 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -716,7 +716,7 @@ def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self) session=session, ) assert dr.state == State.RUNNING - + ti = TaskInstance(op1, dr.execution_date) job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) job1.task_runner = StandardTaskRunner(job1) job1.run() From 6e41893ffc354a627094edef39a15be600cb197e Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 16 Jun 2021 04:45:59 +0100 Subject: [PATCH 15/15] apply suggestions from code review --- airflow/jobs/local_task_job.py | 9 +++++---- tests/jobs/test_local_task_job.py | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index 99d074354bf45..967b25bb38c2a 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -272,8 +272,9 @@ def _update_dagrun_state_for_paused_dag(self, session=None): Checks for paused dags with DagRuns in the running state and update the DagRun state if possible """ - dag_run = self.task_instance.get_dagrun(session=session) - if dag_run: - dag = dag_run.dag = self.task_instance.task.dag - if dag.get_is_paused(session=session): + dag = self.task_instance.task.dag + if dag.get_is_paused(): + dag_run = self.task_instance.get_dagrun(session=session) + if dag_run: + dag_run.dag = dag dag_run.update_state(session=session, execute_callbacks=True) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 4e86d73282edf..77099cb9dc799 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -749,5 +749,5 @@ def test_number_of_queries_single_loop(self, mock_get_task_runner, return_codes) mock_get_task_runner.return_value.return_code.side_effects = return_codes job = LocalTaskJob(task_instance=ti, executor=MockExecutor()) - with assert_queries_count(17): + with assert_queries_count(16): job.run()