diff --git a/airflow/www/views.py b/airflow/www/views.py index ce322d7a6287d..8a45a15444b25 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2900,7 +2900,6 @@ def duration(self, dag_id, session=None): y_points = defaultdict(list) x_points = defaultdict(list) - cumulative_y = defaultdict(list) task_instances = dag.get_task_instances_before(base_date, num_runs, session=session) if task_instances: @@ -2918,7 +2917,6 @@ def duration(self, dag_id, session=None): ) if dag.partial: ti_fails = ti_fails.filter(TaskFail.task_id.in_([t.task_id for t in dag.tasks])) - fails_totals = defaultdict(int) for failed_task_instance in ti_fails: dict_key = ( @@ -2929,13 +2927,21 @@ def duration(self, dag_id, session=None): if failed_task_instance.duration: fails_totals[dict_key] += failed_task_instance.duration - for task_instance in task_instances: - if task_instance.duration: - date_time = wwwutils.epoch(task_instance.execution_date) - x_points[task_instance.task_id].append(date_time) - fails_dict_key = (task_instance.dag_id, task_instance.task_id, task_instance.run_id) + # we must group any mapped TIs by dag_id, task_id, run_id + mapped_tis = set() + tis_grouped = itertools.groupby(task_instances, lambda x: (x.dag_id, x.task_id, x.run_id)) + for key, tis in tis_grouped: + tis = list(tis) + duration = sum(x.duration for x in tis if x.duration) + if duration: + first_ti = tis[0] + if first_ti.map_index >= 0: + mapped_tis.add(first_ti.task_id) + date_time = wwwutils.epoch(first_ti.execution_date) + x_points[first_ti.task_id].append(date_time) + fails_dict_key = (first_ti.dag_id, first_ti.task_id, first_ti.run_id) fails_total = fails_totals[fails_dict_key] - y_points[task_instance.task_id].append(float(task_instance.duration + fails_total)) + y_points[first_ti.task_id].append(float(duration + fails_total)) cumulative_y = {k: list(itertools.accumulate(v)) for k, v in y_points.items()} @@ -2951,12 +2957,12 @@ def duration(self, dag_id, session=None): for task_id in x_points: chart.add_serie( - name=task_id, + name=task_id + '[]' if task_id in mapped_tis else task_id, x=x_points[task_id], y=scale_time_units(y_points[task_id], y_unit), ) cum_chart.add_serie( - name=task_id, + name=task_id + '[]' if task_id in mapped_tis else task_id, x=x_points[task_id], y=scale_time_units(cumulative_y[task_id], cum_y_unit), )