diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index f777fd91518c3..08b7728be76cd 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -1170,6 +1170,7 @@ class GroupCommand(NamedTuple): args=( ARG_DAG_ID, ARG_EXECUTION_DATE, + ARG_CONF, ARG_SUBDIR, ARG_SHOW_DAGRUN, ARG_IMGCAT_DAGRUN, diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 515dcf2d809cf..22031a148905c 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -451,6 +451,10 @@ def dag_list_dag_runs(args, dag=None, session=NEW_SESSION): @cli_utils.action_cli def dag_test(args, session=None): """Execute one single DagRun for a given DAG and execution date, using the DebugExecutor.""" + run_conf = None + if args.conf: + run_conf = json.loads(args.conf) + dag = get_dag(subdir=args.subdir, dag_id=args.dag_id) dag.clear(start_date=args.execution_date, end_date=args.execution_date, dag_run_state=False) try: @@ -458,6 +462,7 @@ def dag_test(args, session=None): executor=DebugExecutor(), start_date=args.execution_date, end_date=args.execution_date, + conf=run_conf, # Always run the DAG at least once even if no logical runs are # available. This does not make a lot of sense, but Airflow has # been doing this prior to 2.2 so we keep compatibility.