Skip to content

Commit

Permalink
Move user-facing string to template (#26815)
Browse files Browse the repository at this point in the history
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
(cherry picked from commit bab6dbe)
  • Loading branch information
blag authored and ephraimbuddy committed Oct 18, 2022
1 parent 9e31a11 commit 9bcf5b3
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
9 changes: 6 additions & 3 deletions airflow/models/dag.py
Expand Up @@ -198,13 +198,16 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
return query.first()


def get_dataset_triggered_next_run_info(dag_ids: list[str], *, session: Session) -> dict[str, str]:
def get_dataset_triggered_next_run_info(dag_ids: list[str], *, session: Session) -> dict[str, dict[str, int]]:
"""
Given a list of dag_ids, get string representing how close any that are dataset triggered are
their next run, e.g. "1 of 2 datasets updated"
"""
return {
x.dag_id: f"{x.ready} of {x.total} datasets updated"
x.dag_id: {
"ready": x.ready,
"total": x.total,
}
for x in session.query(
DagScheduleDatasetReference.dag_id,
func.count().label("total"),
Expand Down Expand Up @@ -3338,7 +3341,7 @@ def calculate_dagrun_date_fields(
)

@provide_session
def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> str | None:
def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[str, int] | None:
if self.schedule_interval != "Dataset":
return None
return get_dataset_triggered_next_run_info([self.dag_id], session=session)[self.dag_id]
Expand Down
4 changes: 3 additions & 1 deletion airflow/www/templates/airflow/dags.html
Expand Up @@ -308,7 +308,9 @@ <h2>{{ page_title }}</h2>
data-dag-id="{{ dag.dag_id }}"
data-summary="{{ dataset_triggered_next_run_info[dag.dag_id] }}"
>
{{ dataset_triggered_next_run_info[dag.dag_id] }}
{% with ds_info = dataset_triggered_next_run_info[dag.dag_id] %}
{{ ds_info.ready }} of {{ ds_info.total }} datasets updated
{% endwith %}
</div>
</span>
{% endif %}
Expand Down
15 changes: 12 additions & 3 deletions tests/models/test_dag.py
Expand Up @@ -2953,12 +2953,21 @@ def test_get_dataset_triggered_next_run_info(dag_maker):
session.flush()

info = get_dataset_triggered_next_run_info([dag1.dag_id], session=session)
assert "0 of 1 datasets updated" == info[dag1.dag_id]
assert info[dag1.dag_id] == {
"ready": 0,
"total": 1,
}

# This time, check both dag2 and dag3 at the same time (tests filtering)
info = get_dataset_triggered_next_run_info([dag2.dag_id, dag3.dag_id], session=session)
assert "1 of 2 datasets updated" == info[dag2.dag_id]
assert "1 of 3 datasets updated" == info[dag3.dag_id]
assert info[dag2.dag_id] == {
"ready": 1,
"total": 2,
}
assert info[dag3.dag_id] == {
"ready": 1,
"total": 3,
}


def test_dag_uses_timetable_for_run_id(session):
Expand Down

0 comments on commit 9bcf5b3

Please sign in to comment.