Skip to content

Commit

Permalink
Include scheduled slots in pools view
Browse files Browse the repository at this point in the history
  • Loading branch information
csp33 authored and potiuk committed Aug 28, 2022
1 parent 1c73304 commit 22f6089
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 2 deletions.
6 changes: 6 additions & 0 deletions airflow/api_connexion/schemas/pool_schema.py
Expand Up @@ -36,6 +36,7 @@ class Meta:
occupied_slots = fields.Method("get_occupied_slots", dump_only=True)
running_slots = fields.Method("get_running_slots", dump_only=True)
queued_slots = fields.Method("get_queued_slots", dump_only=True)
scheduled_slots = fields.Method("get_scheduled_slots", dump_only=True)
open_slots = fields.Method("get_open_slots", dump_only=True)
description = auto_field()

Expand All @@ -54,6 +55,11 @@ def get_queued_slots(obj: Pool) -> int:
"""Returns the queued slots of the pool."""
return obj.queued_slots()

@staticmethod
def get_scheduled_slots(obj: Pool) -> int:
"""Returns the scheduled slots of the pool."""
return obj.scheduled_slots()

@staticmethod
def get_open_slots(obj: Pool) -> float:
"""Returns the open slots of the pool."""
Expand Down
18 changes: 18 additions & 0 deletions airflow/models/pool.py
Expand Up @@ -262,6 +262,24 @@ def queued_slots(self, session: Session = NEW_SESSION):
or 0
)

@provide_session
def scheduled_slots(self, session: Session = NEW_SESSION):
"""
Get the number of slots scheduled at the moment.
:param session: SQLAlchemy ORM Session
:return: the number of scheduled slots
"""
from airflow.models.taskinstance import TaskInstance # Avoid circular import

return int(
session.query(func.sum(TaskInstance.pool_slots))
.filter(TaskInstance.pool == self.pool)
.filter(TaskInstance.state == State.SCHEDULED)
.scalar()
or 0
)

@provide_session
def open_slots(self, session: Session = NEW_SESSION) -> float:
"""
Expand Down
21 changes: 19 additions & 2 deletions airflow/www/views.py
Expand Up @@ -4510,7 +4510,7 @@ class PoolModelView(AirflowModelView):
permissions.ACTION_CAN_ACCESS_MENU,
]

list_columns = ['pool', 'slots', 'running_slots', 'queued_slots']
list_columns = ['pool', 'slots', 'running_slots', 'queued_slots', 'scheduled_slots']
add_columns = ['pool', 'slots', 'description']
edit_columns = ['pool', 'slots', 'description']

Expand Down Expand Up @@ -4567,7 +4567,24 @@ def fqueued_slots(self):
else:
return Markup('<span class="label label-danger">Invalid</span>')

formatters_columns = {'pool': pool_link, 'running_slots': frunning_slots, 'queued_slots': fqueued_slots}
def fscheduled_slots(self):
"""Scheduled slots rendering."""
pool_id = self.get('pool')
scheduled_slots = self.get('scheduled_slots')
if pool_id is not None and scheduled_slots is not None:
url = url_for('TaskInstanceModelView.list', _flt_3_pool=pool_id, _flt_3_state='scheduled')
return Markup("<a href='{url}'>{scheduled_slots}</a>").format(
url=url, scheduled_slots=scheduled_slots
)
else:
return Markup('<span class="label label-danger">Invalid</span>')

formatters_columns = {
'pool': pool_link,
'running_slots': frunning_slots,
'queued_slots': fqueued_slots,
'scheduled_slots': fscheduled_slots,
}

validators_columns = {'pool': [validators.DataRequired()], 'slots': [validators.NumberRange(min=-1)]}

Expand Down
11 changes: 11 additions & 0 deletions tests/api_connexion/endpoints/test_pool_endpoint.py
Expand Up @@ -78,6 +78,7 @@ def test_response_200(self, session):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 128,
"description": "Default pool",
},
Expand All @@ -87,6 +88,7 @@ def test_response_200(self, session):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 3,
"description": None,
},
Expand All @@ -110,6 +112,7 @@ def test_response_200_with_order_by(self, session):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 3,
"description": None,
},
Expand All @@ -119,6 +122,7 @@ def test_response_200_with_order_by(self, session):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 128,
"description": "Default pool",
},
Expand Down Expand Up @@ -217,6 +221,7 @@ def test_response_200(self, session):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 3,
"description": None,
} == response.json
Expand Down Expand Up @@ -289,6 +294,7 @@ def test_response_200(self):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 3,
"description": "test pool",
} == response.json
Expand Down Expand Up @@ -371,6 +377,7 @@ def test_response_200(self, session):
"name": "test_pool_a",
"open_slots": 3,
"running_slots": 0,
"scheduled_slots": 0,
"slots": 3,
"description": None,
} == response.json
Expand Down Expand Up @@ -465,6 +472,7 @@ def test_delete_400(self):
"name": "default_pool",
"open_slots": 3,
"running_slots": 0,
"scheduled_slots": 0,
"slots": 3,
"description": "Default pool",
},
Expand All @@ -480,6 +488,7 @@ def test_delete_400(self):
"name": "default_pool",
"open_slots": 3,
"running_slots": 0,
"scheduled_slots": 0,
"slots": 3,
"description": "Default pool",
},
Expand All @@ -498,6 +507,7 @@ def test_delete_400(self):
"name": "default_pool",
"open_slots": 3,
"running_slots": 0,
"scheduled_slots": 0,
"slots": 3,
"description": "Default pool",
},
Expand Down Expand Up @@ -553,6 +563,7 @@ def test_response_200(self, url, patch_json, expected_name, expected_slots, sess
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": expected_slots,
"description": None,
} == response.json
Expand Down
3 changes: 3 additions & 0 deletions tests/api_connexion/schemas/test_pool_schemas.py
Expand Up @@ -43,6 +43,7 @@ def test_serialize(self, session):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 2,
"description": None,
}
Expand Down Expand Up @@ -73,6 +74,7 @@ def test_serialize(self):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 3,
"description": None,
},
Expand All @@ -82,6 +84,7 @@ def test_serialize(self):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 3,
"description": None,
},
Expand Down
2 changes: 2 additions & 0 deletions tests/api_connexion/test_auth.py
Expand Up @@ -77,6 +77,7 @@ def test_success(self):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 128,
"description": "Default pool",
},
Expand Down Expand Up @@ -149,6 +150,7 @@ def test_success(self):
"occupied_slots": 0,
"running_slots": 0,
"queued_slots": 0,
"scheduled_slots": 0,
"open_slots": 128,
"description": "Default pool",
},
Expand Down
5 changes: 5 additions & 0 deletions tests/www/views/test_views_pool.py
Expand Up @@ -84,8 +84,13 @@ def test_list(app, admin_client, pool_factory):

url = flask.url_for('TaskInstanceModelView.list', _flt_3_pool='test-pool', _flt_3_state='queued')
queued_tag = markupsafe.Markup("<a href='{url}'>{slots}</a>").format(url=url, slots=0)

url = flask.url_for('TaskInstanceModelView.list', _flt_3_pool='test-pool', _flt_3_state='scheduled')
scheduled_tag = markupsafe.Markup("<a href='{url}'>{slots}</a>").format(url=url, slots=0)

check_content_in_response(used_tag, resp)
check_content_in_response(queued_tag, resp)
check_content_in_response(scheduled_tag, resp)


def test_pool_muldelete(session, admin_client, pool_factory):
Expand Down

0 comments on commit 22f6089

Please sign in to comment.