Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conf parameter to CLI for airflow dags test #25900

Merged
1 change: 1 addition & 0 deletions airflow/cli/cli_parser.py
Expand Up @@ -1170,6 +1170,7 @@ class GroupCommand(NamedTuple):
args=(
ARG_DAG_ID,
ARG_EXECUTION_DATE,
ARG_CONF,
ARG_SUBDIR,
ARG_SHOW_DAGRUN,
ARG_IMGCAT_DAGRUN,
Expand Down
8 changes: 8 additions & 0 deletions airflow/cli/commands/dag_command.py
Expand Up @@ -451,13 +451,21 @@ def dag_list_dag_runs(args, dag=None, session=NEW_SESSION):
@cli_utils.action_cli
def dag_test(args, session=None):
"""Execute one single DagRun for a given DAG and execution date, using the DebugExecutor."""
run_conf = None
if args.conf:
try:
run_conf = json.loads(args.conf)
except ValueError as e:
raise SystemExit(f"Configuration '{args.conf}' is not valid JSON. Error: {e}")
uranusjr marked this conversation as resolved.
Show resolved Hide resolved

dag = get_dag(subdir=args.subdir, dag_id=args.dag_id)
dag.clear(start_date=args.execution_date, end_date=args.execution_date, dag_run_state=False)
try:
dag.run(
executor=DebugExecutor(),
start_date=args.execution_date,
end_date=args.execution_date,
conf=run_conf,
# Always run the DAG at least once even if no logical runs are
# available. This does not make a lot of sense, but Airflow has
# been doing this prior to 2.2 so we keep compatibility.
Expand Down
36 changes: 36 additions & 0 deletions tests/cli/commands/test_dag_command.py
Expand Up @@ -17,6 +17,7 @@
# under the License.
import contextlib
import io
import json
import os
import tempfile
import unittest
Expand Down Expand Up @@ -607,6 +608,40 @@ def test_dag_test(self, mock_get_dag, mock_executor):
executor=mock_executor.return_value,
start_date=cli_args.execution_date,
end_date=cli_args.execution_date,
conf=None,
run_at_least_once=True,
),
]
)

@mock.patch("airflow.cli.commands.dag_command.DebugExecutor")
@mock.patch("airflow.cli.commands.dag_command.get_dag")
def test_dag_test_conf(self, mock_get_dag, mock_executor):
cli_args = self.parser.parse_args(
[
'dags',
'test',
'example_bash_operator',
DEFAULT_DATE.isoformat(),
"-c",
"{\"dag_run_conf_param\": \"param_value\"}",
]
)
dag_command.dag_test(cli_args)

mock_get_dag.assert_has_calls(
[
mock.call(subdir=cli_args.subdir, dag_id='example_bash_operator'),
mock.call().clear(
start_date=cli_args.execution_date,
end_date=cli_args.execution_date,
dag_run_state=False,
),
mock.call().run(
executor=mock_executor.return_value,
start_date=cli_args.execution_date,
end_date=cli_args.execution_date,
conf=json.loads(cli_args.conf),
run_at_least_once=True,
),
]
Expand Down Expand Up @@ -636,6 +671,7 @@ def test_dag_test_show_dag(self, mock_get_dag, mock_executor, mock_render_dag):
executor=mock_executor.return_value,
start_date=cli_args.execution_date,
end_date=cli_args.execution_date,
conf=None,
run_at_least_once=True,
),
]
Expand Down