From 22d52c00f6397fde8d97cf2479c0614671f5b5ba Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 26 Sep 2022 20:59:08 +0800 Subject: [PATCH] Use COALESCE when ordering runs to handle NULL (#26626) Data interval columns are NULL for runs created before 2.3, but SQL's NULL-sorting logic would make those old runs always appear first. In a perfect world we'd want to sort by get_run_data_interval(), but that's not efficient, so instead the columns are coalesced into logical date, which is good enough in most cases. --- airflow/www/utils.py | 44 ++++++++++++++++++++++++++++++++++++++------ airflow/www/views.py | 5 +---- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/airflow/www/utils.py b/airflow/www/utils.py index d49b73249717b..946c7cb07377f 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -20,10 +20,9 @@ import json import textwrap import time -from typing import Any +from typing import TYPE_CHECKING, Any, Sequence from urllib.parse import urlencode -import sqlalchemy as sqla from flask import request, url_for from flask.helpers import flash from flask_appbuilder.forms import FieldConverter @@ -37,11 +36,12 @@ from pendulum.datetime import DateTime from pygments import highlight, lexers from pygments.formatters import HtmlFormatter +from sqlalchemy import func, types from sqlalchemy.ext.associationproxy import AssociationProxy -from airflow import models from airflow.exceptions import RemovedInAirflow3Warning from airflow.models import errors +from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning from airflow.models.taskinstance import TaskInstance from airflow.utils import timezone @@ -51,6 +51,10 @@ from airflow.www.forms import DateTimeWithTimezoneField from airflow.www.widgets import AirflowDateTimePickerWidget +if TYPE_CHECKING: + from sqlalchemy.orm.query import Query + from sqlalchemy.sql.operators import ColumnOperators + def datetime_to_string(value: DateTime | None) -> str | None: if value is None: @@ -129,7 +133,7 @@ def get_mapped_summary(parent_instance, task_instances): } -def encode_dag_run(dag_run: models.DagRun | None) -> dict[str, Any] | None: +def encode_dag_run(dag_run: DagRun | None) -> dict[str, Any] | None: if not dag_run: return None @@ -436,6 +440,34 @@ def dag_run_link(attr): return Markup('{run_id}').format(url=url, run_id=run_id) +def _get_run_ordering_expr(name: str) -> ColumnOperators: + expr = DagRun.__table__.columns[name] + # Data interval columns are NULL for runs created before 2.3, but SQL's + # NULL-sorting logic would make those old runs always appear first. In a + # perfect world we'd want to sort by ``get_run_data_interval()``, but that's + # not efficient, so instead the columns are coalesced into execution_date, + # which is good enough in most cases. + if name in ("data_interval_start", "data_interval_end"): + expr = func.coalesce(expr, DagRun.execution_date) + return expr.desc() + + +def sorted_dag_runs(query: Query, *, ordering: Sequence[str], limit: int) -> Sequence[DagRun]: + """Produce DAG runs sorted by specified columns. + + :param query: An ORM query object against *DagRun*. + :param ordering: Column names to sort the runs. should generally come from a + timetable's ``run_ordering``. + :param limit: Number of runs to limit to. + :return: A list of DagRun objects ordered by the specified columns. The list + contains only the *last* objects, but in *ascending* order. + """ + ordering_exprs = (_get_run_ordering_expr(name) for name in ordering) + runs = query.order_by(*ordering_exprs, DagRun.id.desc()).limit(limit).all() + runs.reverse() + return runs + + def format_map_index(attr: dict) -> str: """Format map index for list columns in model view.""" value = attr['map_index'] @@ -652,7 +684,7 @@ def is_utcdatetime(self, col_name): obj = self.list_columns[col_name].type return ( isinstance(obj, UtcDateTime) - or isinstance(obj, sqla.types.TypeDecorator) + or isinstance(obj, types.TypeDecorator) and isinstance(obj.impl, UtcDateTime) ) return False @@ -665,7 +697,7 @@ def is_extendedjson(self, col_name): obj = self.list_columns[col_name].type return ( isinstance(obj, ExtendedJSON) - or isinstance(obj, sqla.types.TypeDecorator) + or isinstance(obj, types.TypeDecorator) and isinstance(obj.impl, ExtendedJSON) ) return False diff --git a/airflow/www/views.py b/airflow/www/views.py index ee651b545d1d7..52daed98876d8 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -3456,10 +3456,7 @@ def grid_data(self): if run_state: query = query.filter(DagRun.state == run_state) - ordering = (DagRun.__table__.columns[name].desc() for name in dag.timetable.run_ordering) - dag_runs = query.order_by(*ordering, DagRun.id.desc()).limit(num_runs).all() - dag_runs.reverse() - + dag_runs = wwwutils.sorted_dag_runs(query, ordering=dag.timetable.run_ordering, limit=num_runs) encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs] data = { 'groups': dag_to_grid(dag, dag_runs, session),