diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 01078aa38e8ee..96cdefc731b69 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -177,6 +177,9 @@ def string_lower_type(val): ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag") ARG_TASK_ID = Arg(("task_id",), help="The id of the task") ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the DAG", type=parsedate) +ARG_EXECUTION_DATE_OPTIONAL = Arg( + ("execution_date",), nargs='?', help="The execution date of the DAG (optional)", type=parsedate +) ARG_EXECUTION_DATE_OR_RUN_ID = Arg( ('execution_date_or_run_id',), help="The execution_date of the DAG or run_id of the DAGRun" ) @@ -1169,7 +1172,7 @@ class GroupCommand(NamedTuple): func=lazy_load_command('airflow.cli.commands.dag_command.dag_test'), args=( ARG_DAG_ID, - ARG_EXECUTION_DATE, + ARG_EXECUTION_DATE_OPTIONAL, ARG_CONF, ARG_SUBDIR, ARG_SHOW_DAGRUN, diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index ddfce4ebdbc72..447c819852099 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -39,7 +39,7 @@ from airflow.models import DagBag, DagModel, DagRun, TaskInstance from airflow.models.dag import DAG from airflow.models.serialized_dag import SerializedDagModel -from airflow.utils import cli as cli_utils +from airflow.utils import cli as cli_utils, timezone from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning from airflow.utils.dot_renderer import render_dag, render_dag_dependencies from airflow.utils.session import NEW_SESSION, create_session, provide_session @@ -457,14 +457,14 @@ def dag_test(args, session=None): run_conf = json.loads(args.conf) except ValueError as e: raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. Error: {e}") - + execution_date = args.execution_date or timezone.utcnow() dag = get_dag(subdir=args.subdir, dag_id=args.dag_id) - dag.clear(start_date=args.execution_date, end_date=args.execution_date, dag_run_state=False) + dag.clear(start_date=execution_date, end_date=execution_date, dag_run_state=False) try: dag.run( executor=DebugExecutor(), - start_date=args.execution_date, - end_date=args.execution_date, + start_date=execution_date, + end_date=execution_date, conf=run_conf, # Always run the DAG at least once even if no logical runs are # available. This does not make a lot of sense, but Airflow has @@ -482,7 +482,7 @@ def dag_test(args, session=None): session.query(TaskInstance) .filter( TaskInstance.dag_id == args.dag_id, - TaskInstance.execution_date == args.execution_date, + TaskInstance.execution_date == execution_date, ) .all() ) diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index 59df405629902..47762928ce286 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -25,6 +25,7 @@ from unittest import mock from unittest.mock import MagicMock +import pendulum import pytest from airflow import settings @@ -614,6 +615,36 @@ def test_dag_test(self, mock_get_dag, mock_executor): ] ) + @mock.patch("airflow.cli.commands.dag_command.DebugExecutor") + @mock.patch("airflow.cli.commands.dag_command.get_dag") + @mock.patch("airflow.utils.timezone.utcnow") + def test_dag_test_no_execution_date(self, mock_utcnow, mock_get_dag, mock_executor): + now = pendulum.now() + mock_utcnow.return_value = now + cli_args = self.parser.parse_args(['dags', 'test', 'example_bash_operator']) + + assert cli_args.execution_date is None + + dag_command.dag_test(cli_args) + + mock_get_dag.assert_has_calls( + [ + mock.call(subdir=cli_args.subdir, dag_id='example_bash_operator'), + mock.call().clear( + start_date=now, + end_date=now, + dag_run_state=False, + ), + mock.call().run( + executor=mock_executor.return_value, + start_date=now, + end_date=now, + conf=None, + run_at_least_once=True, + ), + ] + ) + @mock.patch("airflow.cli.commands.dag_command.DebugExecutor") @mock.patch("airflow.cli.commands.dag_command.get_dag") def test_dag_test_conf(self, mock_get_dag, mock_executor):