From d660752e783f97fd1ba3bc28da0841a163a37210 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 27 Sep 2022 11:26:38 -0700 Subject: [PATCH] Handle mapped tasks in task duration view Previously mapped tasks each existed as their own independent-but-unlabeled points. Makes sense to aggregate them somehow. --- airflow/www/views.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index be8dd1ffa2da3..0cf35fba3a595 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2915,7 +2915,6 @@ def duration(self, dag_id, session=None): ) if dag.partial: ti_fails = ti_fails.filter(TaskFail.task_id.in_([t.task_id for t in dag.tasks])) - fails_totals = defaultdict(int) for failed_task_instance in ti_fails: dict_key = ( @@ -2926,13 +2925,21 @@ def duration(self, dag_id, session=None): if failed_task_instance.duration: fails_totals[dict_key] += failed_task_instance.duration - for task_instance in task_instances: - if task_instance.duration: - date_time = wwwutils.epoch(task_instance.execution_date) - x_points[task_instance.task_id].append(date_time) - fails_dict_key = (task_instance.dag_id, task_instance.task_id, task_instance.run_id) + # we must group any mapped TIs by dag_id, task_id, run_id + mapped_tis = set() + tis_grouped = itertools.groupby(task_instances, lambda x: (x.dag_id, x.task_id, x.run_id)) + for key, tis in tis_grouped: + tis = list(tis) + duration = sum(x.duration for x in tis if x.duration) + if duration: + first_ti = tis[0] + if first_ti.map_index >= 0: + mapped_tis.add(first_ti.task_id) + date_time = wwwutils.epoch(first_ti.execution_date) + x_points[first_ti.task_id].append(date_time) + fails_dict_key = (first_ti.dag_id, first_ti.task_id, first_ti.run_id) fails_total = fails_totals[fails_dict_key] - y_points[task_instance.task_id].append(float(task_instance.duration + fails_total)) + y_points[first_ti.task_id].append(float(duration + fails_total)) cumulative_y = {k: list(itertools.accumulate(v)) for k, v in y_points.items()} @@ -2948,12 +2955,12 @@ def duration(self, dag_id, session=None): for task_id in x_points: chart.add_serie( - name=task_id, + name=task_id + '[]' if task_id in mapped_tis else task_id, x=x_points[task_id], y=scale_time_units(y_points[task_id], y_unit), ) cum_chart.add_serie( - name=task_id, + name=task_id + '[]' if task_id in mapped_tis else task_id, x=x_points[task_id], y=scale_time_units(cumulative_y[task_id], cum_y_unit), )