Skip to content

Commit

Permalink
Handle mapped tasks in task duration chart (#26722)
Browse files Browse the repository at this point in the history
Previously mapped tasks each existed as their own independent-but-unlabeled points.  Makes sense to aggregate them somehow.

(cherry picked from commit 8829c07)
  • Loading branch information
dstandish authored and ephraimbuddy committed Oct 18, 2022
1 parent ecd9439 commit fa62205
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions airflow/www/views.py
Expand Up @@ -2900,7 +2900,6 @@ def duration(self, dag_id, session=None):

y_points = defaultdict(list)
x_points = defaultdict(list)
cumulative_y = defaultdict(list)

task_instances = dag.get_task_instances_before(base_date, num_runs, session=session)
if task_instances:
Expand All @@ -2918,7 +2917,6 @@ def duration(self, dag_id, session=None):
)
if dag.partial:
ti_fails = ti_fails.filter(TaskFail.task_id.in_([t.task_id for t in dag.tasks]))

fails_totals = defaultdict(int)
for failed_task_instance in ti_fails:
dict_key = (
Expand All @@ -2929,13 +2927,21 @@ def duration(self, dag_id, session=None):
if failed_task_instance.duration:
fails_totals[dict_key] += failed_task_instance.duration

for task_instance in task_instances:
if task_instance.duration:
date_time = wwwutils.epoch(task_instance.execution_date)
x_points[task_instance.task_id].append(date_time)
fails_dict_key = (task_instance.dag_id, task_instance.task_id, task_instance.run_id)
# we must group any mapped TIs by dag_id, task_id, run_id
mapped_tis = set()
tis_grouped = itertools.groupby(task_instances, lambda x: (x.dag_id, x.task_id, x.run_id))
for key, tis in tis_grouped:
tis = list(tis)
duration = sum(x.duration for x in tis if x.duration)
if duration:
first_ti = tis[0]
if first_ti.map_index >= 0:
mapped_tis.add(first_ti.task_id)
date_time = wwwutils.epoch(first_ti.execution_date)
x_points[first_ti.task_id].append(date_time)
fails_dict_key = (first_ti.dag_id, first_ti.task_id, first_ti.run_id)
fails_total = fails_totals[fails_dict_key]
y_points[task_instance.task_id].append(float(task_instance.duration + fails_total))
y_points[first_ti.task_id].append(float(duration + fails_total))

cumulative_y = {k: list(itertools.accumulate(v)) for k, v in y_points.items()}

Expand All @@ -2951,12 +2957,12 @@ def duration(self, dag_id, session=None):

for task_id in x_points:
chart.add_serie(
name=task_id,
name=task_id + '[]' if task_id in mapped_tis else task_id,
x=x_points[task_id],
y=scale_time_units(y_points[task_id], y_unit),
)
cum_chart.add_serie(
name=task_id,
name=task_id + '[]' if task_id in mapped_tis else task_id,
x=x_points[task_id],
y=scale_time_units(cumulative_y[task_id], cum_y_unit),
)
Expand Down

0 comments on commit fa62205

Please sign in to comment.