Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include scheduled slots in pools view #26006

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions airflow/api_connexion/schemas/pool_schema.py
Expand Up @@ -36,6 +36,7 @@ class Meta:
occupied_slots = fields.Method("get_occupied_slots", dump_only=True)
running_slots = fields.Method("get_running_slots", dump_only=True)
queued_slots = fields.Method("get_queued_slots", dump_only=True)
scheduled_slots = fields.Method("get_scheduled_slots", dump_only=True)
open_slots = fields.Method("get_open_slots", dump_only=True)
description = auto_field()

Expand All @@ -54,6 +55,11 @@ def get_queued_slots(obj: Pool) -> int:
"""Returns the queued slots of the pool."""
return obj.queued_slots()

@staticmethod
def get_scheduled_slots(obj: Pool) -> int:
"""Returns the scheduled slots of the pool."""
return obj.scheduled_slots()

@staticmethod
def get_open_slots(obj: Pool) -> float:
"""Returns the open slots of the pool."""
Expand Down
18 changes: 18 additions & 0 deletions airflow/models/pool.py
Expand Up @@ -262,6 +262,24 @@ def queued_slots(self, session: Session = NEW_SESSION):
or 0
)

@provide_session
def scheduled_slots(self, session: Session = NEW_SESSION):
"""
Get the number of slots scheduled at the moment.

:param session: SQLAlchemy ORM Session
:return: the number of scheduled slots
"""
from airflow.models.taskinstance import TaskInstance # Avoid circular import

return int(
session.query(func.sum(TaskInstance.pool_slots))
.filter(TaskInstance.pool == self.pool)
.filter(TaskInstance.state == State.SCHEDULED)
.scalar()
or 0
)

@provide_session
def open_slots(self, session: Session = NEW_SESSION) -> float:
"""
Expand Down
21 changes: 19 additions & 2 deletions airflow/www/views.py
Expand Up @@ -4510,7 +4510,7 @@ class PoolModelView(AirflowModelView):
permissions.ACTION_CAN_ACCESS_MENU,
]

list_columns = ['pool', 'slots', 'running_slots', 'queued_slots']
list_columns = ['pool', 'slots', 'running_slots', 'queued_slots', 'scheduled_slots']
add_columns = ['pool', 'slots', 'description']
edit_columns = ['pool', 'slots', 'description']

Expand Down Expand Up @@ -4567,7 +4567,24 @@ def fqueued_slots(self):
else:
return Markup('<span class="label label-danger">Invalid</span>')

formatters_columns = {'pool': pool_link, 'running_slots': frunning_slots, 'queued_slots': fqueued_slots}
def fscheduled_slots(self):
"""Scheduled slots rendering."""
pool_id = self.get('pool')
scheduled_slots = self.get('scheduled_slots')
if pool_id is not None and scheduled_slots is not None:
url = url_for('TaskInstanceModelView.list', _flt_3_pool=pool_id, _flt_3_state='scheduled')
return Markup("<a href='{url}'>{scheduled_slots}</a>").format(
url=url, scheduled_slots=scheduled_slots
)
else:
return Markup('<span class="label label-danger">Invalid</span>')

formatters_columns = {
'pool': pool_link,
'running_slots': frunning_slots,
'queued_slots': fqueued_slots,
'scheduled_slots': fscheduled_slots,
}

validators_columns = {'pool': [validators.DataRequired()], 'slots': [validators.NumberRange(min=-1)]}

Expand Down
11 changes: 11 additions & 0 deletions tests/api_connexion/endpoints/test_pool_endpoint.py
Expand Up @@ -78,6 +78,7 @@ def test_response_200(self, session):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 128,
"description": "Default pool",
},
Expand All @@ -87,6 +88,7 @@ def test_response_200(self, session):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 3,
"description": None,
},
Expand All @@ -110,6 +112,7 @@ def test_response_200_with_order_by(self, session):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 3,
"description": None,
},
Expand All @@ -119,6 +122,7 @@ def test_response_200_with_order_by(self, session):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 128,
"description": "Default pool",
},
Expand Down Expand Up @@ -217,6 +221,7 @@ def test_response_200(self, session):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 3,
"description": None,
} == response.json
Expand Down Expand Up @@ -289,6 +294,7 @@ def test_response_200(self):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 3,
"description": "test pool",
} == response.json
Expand Down Expand Up @@ -371,6 +377,7 @@ def test_response_200(self, session):
"name": "test_pool_a",
"open_slots": 3,
"running_slots": 0,
"scheduled_slots": 0,
"slots": 3,
"description": None,
} == response.json
Expand Down Expand Up @@ -465,6 +472,7 @@ def test_delete_400(self):
"name": "default_pool",
"open_slots": 3,
"running_slots": 0,
"scheduled_slots": 0,
"slots": 3,
"description": "Default pool",
},
Expand All @@ -480,6 +488,7 @@ def test_delete_400(self):
"name": "default_pool",
"open_slots": 3,
"running_slots": 0,
"scheduled_slots": 0,
"slots": 3,
"description": "Default pool",
},
Expand All @@ -498,6 +507,7 @@ def test_delete_400(self):
"name": "default_pool",
"open_slots": 3,
"running_slots": 0,
"scheduled_slots": 0,
"slots": 3,
"description": "Default pool",
},
Expand Down Expand Up @@ -553,6 +563,7 @@ def test_response_200(self, url, patch_json, expected_name, expected_slots, sess
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": expected_slots,
"description": None,
} == response.json
Expand Down
3 changes: 3 additions & 0 deletions tests/api_connexion/schemas/test_pool_schemas.py
Expand Up @@ -43,6 +43,7 @@ def test_serialize(self, session):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 2,
"description": None,
}
Expand Down Expand Up @@ -73,6 +74,7 @@ def test_serialize(self):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 3,
"description": None,
},
Expand All @@ -82,6 +84,7 @@ def test_serialize(self):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 3,
"description": None,
},
Expand Down
2 changes: 2 additions & 0 deletions tests/api_connexion/test_auth.py
Expand Up @@ -77,6 +77,7 @@ def test_success(self):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 128,
"description": "Default pool",
},
Expand Down Expand Up @@ -149,6 +150,7 @@ def test_success(self):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 128,
"description": "Default pool",
},
Expand Down
5 changes: 5 additions & 0 deletions tests/www/views/test_views_pool.py
Expand Up @@ -84,8 +84,13 @@ def test_list(app, admin_client, pool_factory):

url = flask.url_for('TaskInstanceModelView.list', _flt_3_pool='test-pool', _flt_3_state='queued')
queued_tag = markupsafe.Markup("<a href='{url}'>{slots}</a>").format(url=url, slots=0)

url = flask.url_for('TaskInstanceModelView.list', _flt_3_pool='test-pool', _flt_3_state='scheduled')
scheduled_tag = markupsafe.Markup("<a href='{url}'>{slots}</a>").format(url=url, slots=0)

check_content_in_response(used_tag, resp)
check_content_in_response(queued_tag, resp)
check_content_in_response(scheduled_tag, resp)


def test_pool_muldelete(session, admin_client, pool_factory):
Expand Down