From 745fbdf20e1b9182b15cb5a4a00f688ddfbc31e1 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 1 Sep 2022 09:06:45 -0700 Subject: [PATCH 1/2] Make execution_date optional for command `dags test` --- airflow/cli/cli_parser.py | 2 +- airflow/cli/commands/dag_command.py | 12 +++++----- tests/cli/commands/test_dag_command.py | 31 ++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 01078aa38e8ee..93b3fd85c5641 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -176,7 +176,7 @@ def string_lower_type(val): # Shared ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag") ARG_TASK_ID = Arg(("task_id",), help="The id of the task") -ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the DAG", type=parsedate) +ARG_EXECUTION_DATE = Arg(("execution_date",), nargs='?', help="The execution date of the DAG", type=parsedate) ARG_EXECUTION_DATE_OR_RUN_ID = Arg( ('execution_date_or_run_id',), help="The execution_date of the DAG or run_id of the DAGRun" ) diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index ddfce4ebdbc72..447c819852099 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -39,7 +39,7 @@ from airflow.models import DagBag, DagModel, DagRun, TaskInstance from airflow.models.dag import DAG from airflow.models.serialized_dag import SerializedDagModel -from airflow.utils import cli as cli_utils +from airflow.utils import cli as cli_utils, timezone from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning from airflow.utils.dot_renderer import render_dag, render_dag_dependencies from airflow.utils.session import NEW_SESSION, create_session, provide_session @@ -457,14 +457,14 @@ def dag_test(args, session=None): run_conf = json.loads(args.conf) except ValueError as e: raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. Error: {e}") - + execution_date = args.execution_date or timezone.utcnow() dag = get_dag(subdir=args.subdir, dag_id=args.dag_id) - dag.clear(start_date=args.execution_date, end_date=args.execution_date, dag_run_state=False) + dag.clear(start_date=execution_date, end_date=execution_date, dag_run_state=False) try: dag.run( executor=DebugExecutor(), - start_date=args.execution_date, - end_date=args.execution_date, + start_date=execution_date, + end_date=execution_date, conf=run_conf, # Always run the DAG at least once even if no logical runs are # available. This does not make a lot of sense, but Airflow has @@ -482,7 +482,7 @@ def dag_test(args, session=None): session.query(TaskInstance) .filter( TaskInstance.dag_id == args.dag_id, - TaskInstance.execution_date == args.execution_date, + TaskInstance.execution_date == execution_date, ) .all() ) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 59df405629902..47762928ce286 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -25,6 +25,7 @@ from unittest import mock from unittest.mock import MagicMock +import pendulum import pytest from airflow import settings @@ -614,6 +615,36 @@ def test_dag_test(self, mock_get_dag, mock_executor): ] ) + @mock.patch("airflow.cli.commands.dag_command.DebugExecutor") + @mock.patch("airflow.cli.commands.dag_command.get_dag") + @mock.patch("airflow.utils.timezone.utcnow") + def test_dag_test_no_execution_date(self, mock_utcnow, mock_get_dag, mock_executor): + now = pendulum.now() + mock_utcnow.return_value = now + cli_args = self.parser.parse_args(['dags', 'test', 'example_bash_operator']) + + assert cli_args.execution_date is None + + dag_command.dag_test(cli_args) + + mock_get_dag.assert_has_calls( + [ + mock.call(subdir=cli_args.subdir, dag_id='example_bash_operator'), + mock.call().clear( + start_date=now, + end_date=now, + dag_run_state=False, + ), + mock.call().run( + executor=mock_executor.return_value, + start_date=now, + end_date=now, + conf=None, + run_at_least_once=True, + ), + ] + ) + @mock.patch("airflow.cli.commands.dag_command.DebugExecutor") @mock.patch("airflow.cli.commands.dag_command.get_dag") def test_dag_test_conf(self, mock_get_dag, mock_executor): From 222346a4757d41fa0c88b1d47db66137f12d7e38 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 1 Sep 2022 09:48:55 -0700 Subject: [PATCH 2/2] fixup! Make execution_date optional for command `dags test` --- airflow/cli/cli_parser.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 93b3fd85c5641..96cdefc731b69 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -176,7 +176,10 @@ def string_lower_type(val): # Shared ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag") ARG_TASK_ID = Arg(("task_id",), help="The id of the task") -ARG_EXECUTION_DATE = Arg(("execution_date",), nargs='?', help="The execution date of the DAG", type=parsedate) +ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the DAG", type=parsedate) +ARG_EXECUTION_DATE_OPTIONAL = Arg( + ("execution_date",), nargs='?', help="The execution date of the DAG (optional)", type=parsedate +) ARG_EXECUTION_DATE_OR_RUN_ID = Arg( ('execution_date_or_run_id',), help="The execution_date of the DAG or run_id of the DAGRun" ) @@ -1169,7 +1172,7 @@ class GroupCommand(NamedTuple): func=lazy_load_command('airflow.cli.commands.dag_command.dag_test'), args=( ARG_DAG_ID, - ARG_EXECUTION_DATE, + ARG_EXECUTION_DATE_OPTIONAL, ARG_CONF, ARG_SUBDIR, ARG_SHOW_DAGRUN,