From a665f48b606065977e0d3952bc74635ce11726d1 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 21 Jan 2022 21:44:40 +0800 Subject: [PATCH] Fix session usage in ``/rendered-k8s`` view (#21006) We can't commit the session too early because later functions need that session to fetch related objects. Fix #20534. --- airflow/www/views.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index 8544dbac4b9ba..a17f98eefb8fb 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -84,7 +84,7 @@ from pygments.formatters import HtmlFormatter from sqlalchemy import Date, and_, desc, func, inspect, union_all from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm import joinedload +from sqlalchemy.orm import Session, joinedload from wtforms import SelectField, validators from wtforms.validators import InputRequired @@ -114,7 +114,7 @@ from airflow.utils.helpers import alchemy_to_dict from airflow.utils.log import secrets_masker from airflow.utils.log.log_reader import TaskLogReader -from airflow.utils.session import create_session, provide_session +from airflow.utils.session import NEW_SESSION, create_session, provide_session from airflow.utils.state import State, TaskInstanceState from airflow.utils.strings import to_boolean from airflow.utils.timezone import td_format, utcnow @@ -1228,7 +1228,8 @@ def rendered_templates(self, session): ] ) @action_logging - def rendered_k8s(self): + @provide_session + def rendered_k8s(self, session: Session = NEW_SESSION): """Get rendered k8s yaml.""" if not settings.IS_K8S_OR_K8SCELERY_EXECUTOR: abort(404) @@ -1239,14 +1240,15 @@ def rendered_k8s(self): form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') logging.info("Retrieving rendered templates.") - dag = current_app.dag_bag.get_dag(dag_id) + + dag: DAG = current_app.dag_bag.get_dag(dag_id) task = dag.get_task(task_id) - dag_run = dag.get_dagrun(execution_date=dttm) - ti = dag_run.get_task_instance(task_id=task.task_id) + dag_run = dag.get_dagrun(execution_date=dttm, session=session) + ti = dag_run.get_task_instance(task_id=task.task_id, session=session) pod_spec = None try: - pod_spec = ti.get_rendered_k8s_spec() + pod_spec = ti.get_rendered_k8s_spec(session=session) except AirflowException as e: msg = "Error rendering Kubernetes POD Spec: " + escape(e) if e.__cause__: