From 24063fd7eff1182a9c3893cb4bc6e7c80e240fc0 Mon Sep 17 00:00:00 2001 From: Aleksej Kirilishin Date: Sun, 6 Feb 2022 03:33:21 +0300 Subject: [PATCH 1/2] Show task status only for running dags or only for the last finished dag --- airflow/www/views.py | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index 678d1cc4db3f1..c2b62328c21da 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -922,7 +922,9 @@ def task_stats(self, session=None): # Select all task_instances from active dag_runs. running_task_instance_query_result = session.query( - TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state') + TaskInstance.dag_id.label('dag_id'), + TaskInstance.state.label('state'), + sqla.literal(True).label('is_dag_running'), ).join( running_dag_run_query_result, and_( @@ -946,7 +948,11 @@ def task_stats(self, session=None): # Select all task_instances from active dag_runs. # If no dag_run is active, return task instances from most recent dag_run. last_task_instance_query_result = ( - session.query(TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state')) + session.query( + TaskInstance.dag_id.label('dag_id'), + TaskInstance.state.label('state'), + sqla.literal(False).label('is_dag_running'), + ) .join(TaskInstance.dag_run) .join( last_dag_run, @@ -963,14 +969,28 @@ def task_stats(self, session=None): else: final_task_instance_query_result = running_task_instance_query_result.subquery('final_ti') - qry = session.query( - final_task_instance_query_result.c.dag_id, - final_task_instance_query_result.c.state, - sqla.func.count(), - ).group_by(final_task_instance_query_result.c.dag_id, final_task_instance_query_result.c.state) + qry = ( + session.query( + final_task_instance_query_result.c.dag_id, + final_task_instance_query_result.c.state, + final_task_instance_query_result.c.is_dag_running, + sqla.func.count(), + ) + .group_by( + final_task_instance_query_result.c.dag_id, + final_task_instance_query_result.c.state, + final_task_instance_query_result.c.is_dag_running, + ) + .order_by(final_task_instance_query_result.c.is_dag_running.desc()) + ) data = {} - for dag_id, state, count in qry: + has_running_dags = False + for dag_id, state, is_dag_running, count in qry: + if is_dag_running: + has_running_dags = True + elif has_running_dags: + break if dag_id not in data: data[dag_id] = {} data[dag_id][state] = count From 2b1575720920c0598d8de217754f755244e741b0 Mon Sep 17 00:00:00 2001 From: Aleksej Kirilishin Date: Thu, 10 Feb 2022 16:33:22 +0300 Subject: [PATCH 2/2] Brought the logic of getting task statistics into a separate function --- airflow/www/views.py | 42 +++++++++++++++++++++++++---------- tests/www/views/test_views.py | 35 ++++++++++++++++++++++++++++- 2 files changed, 64 insertions(+), 13 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index c2b62328c21da..ec02bf0bb810d 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -491,6 +491,31 @@ def get_downstream(task): return result +def get_task_stats_from_query(qry): + """ + Return a dict of the task quantity, grouped by dag id and task status. + + :param qry: The data in the format (, , , ), + ordered by and + """ + data = {} + last_dag_id = None + has_running_dags = False + for dag_id, state, is_dag_running, count in qry: + if last_dag_id != dag_id: + last_dag_id = dag_id + has_running_dags = False + elif not is_dag_running and has_running_dags: + continue + + if is_dag_running: + has_running_dags = True + if dag_id not in data: + data[dag_id] = {} + data[dag_id][state] = count + return data + + ###################################################################################### # Error handlers ###################################################################################### @@ -981,20 +1006,13 @@ def task_stats(self, session=None): final_task_instance_query_result.c.state, final_task_instance_query_result.c.is_dag_running, ) - .order_by(final_task_instance_query_result.c.is_dag_running.desc()) + .order_by( + final_task_instance_query_result.c.dag_id, + final_task_instance_query_result.c.is_dag_running.desc(), + ) ) - data = {} - has_running_dags = False - for dag_id, state, is_dag_running, count in qry: - if is_dag_running: - has_running_dags = True - elif has_running_dags: - break - if dag_id not in data: - data[dag_id] = {} - data[dag_id][state] = count - + data = get_task_stats_from_query(qry) payload = {} for dag_id in filter_dag_ids: payload[dag_id] = [] diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py index 35f5910280267..66cf54ffd82e2 100644 --- a/tests/www/views/test_views.py +++ b/tests/www/views/test_views.py @@ -24,7 +24,13 @@ from airflow.configuration import initialize_config from airflow.plugins_manager import AirflowPlugin, EntryPointSource from airflow.www import views -from airflow.www.views import get_key_paths, get_safe_url, get_value_from_path, truncate_task_duration +from airflow.www.views import ( + get_key_paths, + get_safe_url, + get_task_stats_from_query, + get_value_from_path, + truncate_task_duration, +) from tests.test_utils.config import conf_vars from tests.test_utils.mock_plugins import mock_plugin_manager from tests.test_utils.www import check_content_in_response, check_content_not_in_response @@ -339,3 +345,30 @@ def test_dag_edit_privileged_requires_view_has_action_decorators(cls: type): action_funcs = action_funcs - {"action_post"} for action_function in action_funcs: assert_decorator_used(cls, action_function, views.action_has_dag_edit_access) + + +def test_get_task_stats_from_query(): + query_data = [ + ['dag1', 'queued', True, 1], + ['dag1', 'running', True, 2], + ['dag1', 'success', False, 3], + ['dag2', 'running', True, 4], + ['dag2', 'success', True, 5], + ['dag3', 'success', False, 6], + ] + expected_data = { + 'dag1': { + 'queued': 1, + 'running': 2, + }, + 'dag2': { + 'running': 4, + 'success': 5, + }, + 'dag3': { + 'success': 6, + }, + } + + data = get_task_stats_from_query(query_data) + assert data == expected_data