Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make execution_date optional for command dags test #26111

Merged
merged 2 commits into from Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion airflow/cli/cli_parser.py
Expand Up @@ -177,6 +177,9 @@ def string_lower_type(val):
ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag")
ARG_TASK_ID = Arg(("task_id",), help="The id of the task")
ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the DAG", type=parsedate)
ARG_EXECUTION_DATE_OPTIONAL = Arg(
("execution_date",), nargs='?', help="The execution date of the DAG (optional)", type=parsedate
)
ARG_EXECUTION_DATE_OR_RUN_ID = Arg(
('execution_date_or_run_id',), help="The execution_date of the DAG or run_id of the DAGRun"
)
Expand Down Expand Up @@ -1169,7 +1172,7 @@ class GroupCommand(NamedTuple):
func=lazy_load_command('airflow.cli.commands.dag_command.dag_test'),
args=(
ARG_DAG_ID,
ARG_EXECUTION_DATE,
ARG_EXECUTION_DATE_OPTIONAL,
ARG_CONF,
ARG_SUBDIR,
ARG_SHOW_DAGRUN,
Expand Down
12 changes: 6 additions & 6 deletions airflow/cli/commands/dag_command.py
Expand Up @@ -39,7 +39,7 @@
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.models.dag import DAG
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import cli as cli_utils
from airflow.utils import cli as cli_utils, timezone
from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
from airflow.utils.session import NEW_SESSION, create_session, provide_session
Expand Down Expand Up @@ -457,14 +457,14 @@ def dag_test(args, session=None):
run_conf = json.loads(args.conf)
except ValueError as e:
raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. Error: {e}")

execution_date = args.execution_date or timezone.utcnow()
dag = get_dag(subdir=args.subdir, dag_id=args.dag_id)
dag.clear(start_date=args.execution_date, end_date=args.execution_date, dag_run_state=False)
dag.clear(start_date=execution_date, end_date=execution_date, dag_run_state=False)
try:
dag.run(
executor=DebugExecutor(),
start_date=args.execution_date,
end_date=args.execution_date,
start_date=execution_date,
end_date=execution_date,
conf=run_conf,
# Always run the DAG at least once even if no logical runs are
# available. This does not make a lot of sense, but Airflow has
Expand All @@ -482,7 +482,7 @@ def dag_test(args, session=None):
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == args.dag_id,
TaskInstance.execution_date == args.execution_date,
TaskInstance.execution_date == execution_date,
)
.all()
)
Expand Down
31 changes: 31 additions & 0 deletions tests/cli/commands/test_dag_command.py
Expand Up @@ -25,6 +25,7 @@
from unittest import mock
from unittest.mock import MagicMock

import pendulum
import pytest

from airflow import settings
Expand Down Expand Up @@ -614,6 +615,36 @@ def test_dag_test(self, mock_get_dag, mock_executor):
]
)

@mock.patch("airflow.cli.commands.dag_command.DebugExecutor")
@mock.patch("airflow.cli.commands.dag_command.get_dag")
@mock.patch("airflow.utils.timezone.utcnow")
def test_dag_test_no_execution_date(self, mock_utcnow, mock_get_dag, mock_executor):
now = pendulum.now()
mock_utcnow.return_value = now
cli_args = self.parser.parse_args(['dags', 'test', 'example_bash_operator'])

assert cli_args.execution_date is None

dag_command.dag_test(cli_args)

mock_get_dag.assert_has_calls(
[
mock.call(subdir=cli_args.subdir, dag_id='example_bash_operator'),
mock.call().clear(
start_date=now,
end_date=now,
dag_run_state=False,
),
mock.call().run(
executor=mock_executor.return_value,
start_date=now,
end_date=now,
conf=None,
run_at_least_once=True,
),
]
)

@mock.patch("airflow.cli.commands.dag_command.DebugExecutor")
@mock.patch("airflow.cli.commands.dag_command.get_dag")
def test_dag_test_conf(self, mock_get_dag, mock_executor):
Expand Down