Skip to content

Commit

Permalink
Show task status only for running dags or only for the last finished …
Browse files Browse the repository at this point in the history
…dag (#21352)

* Show task status only for running dags or only for the last finished dag

* Brought the logic of getting task statistics into a separate function
  • Loading branch information
avkirilishin committed Feb 14, 2022
1 parent 28378d8 commit 28d7bde
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 14 deletions.
64 changes: 51 additions & 13 deletions airflow/www/views.py
Expand Up @@ -491,6 +491,31 @@ def get_downstream(task):
return result


def get_task_stats_from_query(qry):
"""
Return a dict of the task quantity, grouped by dag id and task status.
:param qry: The data in the format (<dag id>, <task state>, <is dag running>, <task count>),
ordered by <dag id> and <is dag running>
"""
data = {}
last_dag_id = None
has_running_dags = False
for dag_id, state, is_dag_running, count in qry:
if last_dag_id != dag_id:
last_dag_id = dag_id
has_running_dags = False
elif not is_dag_running and has_running_dags:
continue

if is_dag_running:
has_running_dags = True
if dag_id not in data:
data[dag_id] = {}
data[dag_id][state] = count
return data


######################################################################################
# Error handlers
######################################################################################
Expand Down Expand Up @@ -922,7 +947,9 @@ def task_stats(self, session=None):

# Select all task_instances from active dag_runs.
running_task_instance_query_result = session.query(
TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state')
TaskInstance.dag_id.label('dag_id'),
TaskInstance.state.label('state'),
sqla.literal(True).label('is_dag_running'),
).join(
running_dag_run_query_result,
and_(
Expand All @@ -946,7 +973,11 @@ def task_stats(self, session=None):
# Select all task_instances from active dag_runs.
# If no dag_run is active, return task instances from most recent dag_run.
last_task_instance_query_result = (
session.query(TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state'))
session.query(
TaskInstance.dag_id.label('dag_id'),
TaskInstance.state.label('state'),
sqla.literal(False).label('is_dag_running'),
)
.join(TaskInstance.dag_run)
.join(
last_dag_run,
Expand All @@ -963,18 +994,25 @@ def task_stats(self, session=None):
else:
final_task_instance_query_result = running_task_instance_query_result.subquery('final_ti')

qry = session.query(
final_task_instance_query_result.c.dag_id,
final_task_instance_query_result.c.state,
sqla.func.count(),
).group_by(final_task_instance_query_result.c.dag_id, final_task_instance_query_result.c.state)

data = {}
for dag_id, state, count in qry:
if dag_id not in data:
data[dag_id] = {}
data[dag_id][state] = count
qry = (
session.query(
final_task_instance_query_result.c.dag_id,
final_task_instance_query_result.c.state,
final_task_instance_query_result.c.is_dag_running,
sqla.func.count(),
)
.group_by(
final_task_instance_query_result.c.dag_id,
final_task_instance_query_result.c.state,
final_task_instance_query_result.c.is_dag_running,
)
.order_by(
final_task_instance_query_result.c.dag_id,
final_task_instance_query_result.c.is_dag_running.desc(),
)
)

data = get_task_stats_from_query(qry)
payload = {}
for dag_id in filter_dag_ids:
payload[dag_id] = []
Expand Down
35 changes: 34 additions & 1 deletion tests/www/views/test_views.py
Expand Up @@ -24,7 +24,13 @@
from airflow.configuration import initialize_config
from airflow.plugins_manager import AirflowPlugin, EntryPointSource
from airflow.www import views
from airflow.www.views import get_key_paths, get_safe_url, get_value_from_path, truncate_task_duration
from airflow.www.views import (
get_key_paths,
get_safe_url,
get_task_stats_from_query,
get_value_from_path,
truncate_task_duration,
)
from tests.test_utils.config import conf_vars
from tests.test_utils.mock_plugins import mock_plugin_manager
from tests.test_utils.www import check_content_in_response, check_content_not_in_response
Expand Down Expand Up @@ -339,3 +345,30 @@ def test_dag_edit_privileged_requires_view_has_action_decorators(cls: type):
action_funcs = action_funcs - {"action_post"}
for action_function in action_funcs:
assert_decorator_used(cls, action_function, views.action_has_dag_edit_access)


def test_get_task_stats_from_query():
query_data = [
['dag1', 'queued', True, 1],
['dag1', 'running', True, 2],
['dag1', 'success', False, 3],
['dag2', 'running', True, 4],
['dag2', 'success', True, 5],
['dag3', 'success', False, 6],
]
expected_data = {
'dag1': {
'queued': 1,
'running': 2,
},
'dag2': {
'running': 4,
'success': 5,
},
'dag3': {
'success': 6,
},
}

data = get_task_stats_from_query(query_data)
assert data == expected_data

0 comments on commit 28d7bde

Please sign in to comment.