Skip to content

Commit

Permalink
Use COALESCE when ordering runs to handle NULL (#26626)
Browse files Browse the repository at this point in the history
Data interval columns are NULL for runs created before 2.3, but SQL's
NULL-sorting logic would make those old runs always appear first. In a
perfect world we'd want to sort by get_run_data_interval(), but that's
not efficient, so instead the columns are coalesced into logical date,
which is good enough in most cases.
  • Loading branch information
uranusjr committed Sep 26, 2022
1 parent e5c903c commit 22d52c0
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
44 changes: 38 additions & 6 deletions airflow/www/utils.py
Expand Up @@ -20,10 +20,9 @@
import json
import textwrap
import time
from typing import Any
from typing import TYPE_CHECKING, Any, Sequence
from urllib.parse import urlencode

import sqlalchemy as sqla
from flask import request, url_for
from flask.helpers import flash
from flask_appbuilder.forms import FieldConverter
Expand All @@ -37,11 +36,12 @@
from pendulum.datetime import DateTime
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
from sqlalchemy import func, types
from sqlalchemy.ext.associationproxy import AssociationProxy

from airflow import models
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import errors
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
Expand All @@ -51,6 +51,10 @@
from airflow.www.forms import DateTimeWithTimezoneField
from airflow.www.widgets import AirflowDateTimePickerWidget

if TYPE_CHECKING:
from sqlalchemy.orm.query import Query
from sqlalchemy.sql.operators import ColumnOperators


def datetime_to_string(value: DateTime | None) -> str | None:
if value is None:
Expand Down Expand Up @@ -129,7 +133,7 @@ def get_mapped_summary(parent_instance, task_instances):
}


def encode_dag_run(dag_run: models.DagRun | None) -> dict[str, Any] | None:
def encode_dag_run(dag_run: DagRun | None) -> dict[str, Any] | None:
if not dag_run:
return None

Expand Down Expand Up @@ -436,6 +440,34 @@ def dag_run_link(attr):
return Markup('<a href="{url}">{run_id}</a>').format(url=url, run_id=run_id)


def _get_run_ordering_expr(name: str) -> ColumnOperators:
expr = DagRun.__table__.columns[name]
# Data interval columns are NULL for runs created before 2.3, but SQL's
# NULL-sorting logic would make those old runs always appear first. In a
# perfect world we'd want to sort by ``get_run_data_interval()``, but that's
# not efficient, so instead the columns are coalesced into execution_date,
# which is good enough in most cases.
if name in ("data_interval_start", "data_interval_end"):
expr = func.coalesce(expr, DagRun.execution_date)
return expr.desc()


def sorted_dag_runs(query: Query, *, ordering: Sequence[str], limit: int) -> Sequence[DagRun]:
"""Produce DAG runs sorted by specified columns.
:param query: An ORM query object against *DagRun*.
:param ordering: Column names to sort the runs. should generally come from a
timetable's ``run_ordering``.
:param limit: Number of runs to limit to.
:return: A list of DagRun objects ordered by the specified columns. The list
contains only the *last* objects, but in *ascending* order.
"""
ordering_exprs = (_get_run_ordering_expr(name) for name in ordering)
runs = query.order_by(*ordering_exprs, DagRun.id.desc()).limit(limit).all()
runs.reverse()
return runs


def format_map_index(attr: dict) -> str:
"""Format map index for list columns in model view."""
value = attr['map_index']
Expand Down Expand Up @@ -652,7 +684,7 @@ def is_utcdatetime(self, col_name):
obj = self.list_columns[col_name].type
return (
isinstance(obj, UtcDateTime)
or isinstance(obj, sqla.types.TypeDecorator)
or isinstance(obj, types.TypeDecorator)
and isinstance(obj.impl, UtcDateTime)
)
return False
Expand All @@ -665,7 +697,7 @@ def is_extendedjson(self, col_name):
obj = self.list_columns[col_name].type
return (
isinstance(obj, ExtendedJSON)
or isinstance(obj, sqla.types.TypeDecorator)
or isinstance(obj, types.TypeDecorator)
and isinstance(obj.impl, ExtendedJSON)
)
return False
Expand Down
5 changes: 1 addition & 4 deletions airflow/www/views.py
Expand Up @@ -3456,10 +3456,7 @@ def grid_data(self):
if run_state:
query = query.filter(DagRun.state == run_state)

ordering = (DagRun.__table__.columns[name].desc() for name in dag.timetable.run_ordering)
dag_runs = query.order_by(*ordering, DagRun.id.desc()).limit(num_runs).all()
dag_runs.reverse()

dag_runs = wwwutils.sorted_dag_runs(query, ordering=dag.timetable.run_ordering, limit=num_runs)
encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs]
data = {
'groups': dag_to_grid(dag, dag_runs, session),
Expand Down

0 comments on commit 22d52c0

Please sign in to comment.