Skip to content

Commit

Permalink
Handle mapped tasks in task duration view
Browse files Browse the repository at this point in the history
Previously mapped tasks each existed as their own independent-but-unlabeled points.  Makes sense to aggregate them somehow.
  • Loading branch information
dstandish committed Sep 28, 2022
1 parent cf32cec commit d660752
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions airflow/www/views.py
Expand Up @@ -2915,7 +2915,6 @@ def duration(self, dag_id, session=None):
)
if dag.partial:
ti_fails = ti_fails.filter(TaskFail.task_id.in_([t.task_id for t in dag.tasks]))

fails_totals = defaultdict(int)
for failed_task_instance in ti_fails:
dict_key = (
Expand All @@ -2926,13 +2925,21 @@ def duration(self, dag_id, session=None):
if failed_task_instance.duration:
fails_totals[dict_key] += failed_task_instance.duration

for task_instance in task_instances:
if task_instance.duration:
date_time = wwwutils.epoch(task_instance.execution_date)
x_points[task_instance.task_id].append(date_time)
fails_dict_key = (task_instance.dag_id, task_instance.task_id, task_instance.run_id)
# we must group any mapped TIs by dag_id, task_id, run_id
mapped_tis = set()
tis_grouped = itertools.groupby(task_instances, lambda x: (x.dag_id, x.task_id, x.run_id))
for key, tis in tis_grouped:
tis = list(tis)
duration = sum(x.duration for x in tis if x.duration)
if duration:
first_ti = tis[0]
if first_ti.map_index >= 0:
mapped_tis.add(first_ti.task_id)
date_time = wwwutils.epoch(first_ti.execution_date)
x_points[first_ti.task_id].append(date_time)
fails_dict_key = (first_ti.dag_id, first_ti.task_id, first_ti.run_id)
fails_total = fails_totals[fails_dict_key]
y_points[task_instance.task_id].append(float(task_instance.duration + fails_total))
y_points[first_ti.task_id].append(float(duration + fails_total))

cumulative_y = {k: list(itertools.accumulate(v)) for k, v in y_points.items()}

Expand All @@ -2948,12 +2955,12 @@ def duration(self, dag_id, session=None):

for task_id in x_points:
chart.add_serie(
name=task_id,
name=task_id + '[]' if task_id in mapped_tis else task_id,
x=x_points[task_id],
y=scale_time_units(y_points[task_id], y_unit),
)
cum_chart.add_serie(
name=task_id,
name=task_id + '[]' if task_id in mapped_tis else task_id,
x=x_points[task_id],
y=scale_time_units(cumulative_y[task_id], cum_y_unit),
)
Expand Down

0 comments on commit d660752

Please sign in to comment.