diff --git a/airflow/api_connexion/schemas/pool_schema.py b/airflow/api_connexion/schemas/pool_schema.py index 1f91b0d5bdd1c..c942485082537 100644 --- a/airflow/api_connexion/schemas/pool_schema.py +++ b/airflow/api_connexion/schemas/pool_schema.py @@ -36,6 +36,7 @@ class Meta: occupied_slots = fields.Method("get_occupied_slots", dump_only=True) running_slots = fields.Method("get_running_slots", dump_only=True) queued_slots = fields.Method("get_queued_slots", dump_only=True) + scheduled_slots = fields.Method("get_scheduled_slots", dump_only=True) open_slots = fields.Method("get_open_slots", dump_only=True) description = auto_field() @@ -54,6 +55,11 @@ def get_queued_slots(obj: Pool) -> int: """Returns the queued slots of the pool.""" return obj.queued_slots() + @staticmethod + def get_scheduled_slots(obj: Pool) -> int: + """Returns the scheduled slots of the pool.""" + return obj.scheduled_slots() + @staticmethod def get_open_slots(obj: Pool) -> float: """Returns the open slots of the pool.""" diff --git a/airflow/models/pool.py b/airflow/models/pool.py index f195b93d42623..dd01dc9931fc7 100644 --- a/airflow/models/pool.py +++ b/airflow/models/pool.py @@ -262,6 +262,24 @@ def queued_slots(self, session: Session = NEW_SESSION): or 0 ) + @provide_session + def scheduled_slots(self, session: Session = NEW_SESSION): + """ + Get the number of slots scheduled at the moment. + + :param session: SQLAlchemy ORM Session + :return: the number of scheduled slots + """ + from airflow.models.taskinstance import TaskInstance # Avoid circular import + + return int( + session.query(func.sum(TaskInstance.pool_slots)) + .filter(TaskInstance.pool == self.pool) + .filter(TaskInstance.state == State.SCHEDULED) + .scalar() + or 0 + ) + @provide_session def open_slots(self, session: Session = NEW_SESSION) -> float: """ diff --git a/airflow/www/views.py b/airflow/www/views.py index e7cdc3aeb8dc3..f750025aa9452 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -4510,7 +4510,7 @@ class PoolModelView(AirflowModelView): permissions.ACTION_CAN_ACCESS_MENU, ] - list_columns = ['pool', 'slots', 'running_slots', 'queued_slots'] + list_columns = ['pool', 'slots', 'running_slots', 'queued_slots', 'scheduled_slots'] add_columns = ['pool', 'slots', 'description'] edit_columns = ['pool', 'slots', 'description'] @@ -4567,7 +4567,24 @@ def fqueued_slots(self): else: return Markup('Invalid') - formatters_columns = {'pool': pool_link, 'running_slots': frunning_slots, 'queued_slots': fqueued_slots} + def fscheduled_slots(self): + """Scheduled slots rendering.""" + pool_id = self.get('pool') + scheduled_slots = self.get('scheduled_slots') + if pool_id is not None and scheduled_slots is not None: + url = url_for('TaskInstanceModelView.list', _flt_3_pool=pool_id, _flt_3_state='scheduled') + return Markup("{scheduled_slots}").format( + url=url, scheduled_slots=scheduled_slots + ) + else: + return Markup('Invalid') + + formatters_columns = { + 'pool': pool_link, + 'running_slots': frunning_slots, + 'queued_slots': fqueued_slots, + 'scheduled_slots': fscheduled_slots, + } validators_columns = {'pool': [validators.DataRequired()], 'slots': [validators.NumberRange(min=-1)]} diff --git a/tests/api_connexion/endpoints/test_pool_endpoint.py b/tests/api_connexion/endpoints/test_pool_endpoint.py index a1f1750637454..0128de64a5a08 100644 --- a/tests/api_connexion/endpoints/test_pool_endpoint.py +++ b/tests/api_connexion/endpoints/test_pool_endpoint.py @@ -78,6 +78,7 @@ def test_response_200(self, session): "occupied_slots": 0, "running_slots": 0, "queued_slots": 0, + "scheduled_slots": 0, "open_slots": 128, "description": "Default pool", }, @@ -87,6 +88,7 @@ def test_response_200(self, session): "occupied_slots": 0, "running_slots": 0, "queued_slots": 0, + "scheduled_slots": 0, "open_slots": 3, "description": None, }, @@ -110,6 +112,7 @@ def test_response_200_with_order_by(self, session): "occupied_slots": 0, "running_slots": 0, "queued_slots": 0, + "scheduled_slots": 0, "open_slots": 3, "description": None, }, @@ -119,6 +122,7 @@ def test_response_200_with_order_by(self, session): "occupied_slots": 0, "running_slots": 0, "queued_slots": 0, + "scheduled_slots": 0, "open_slots": 128, "description": "Default pool", }, @@ -217,6 +221,7 @@ def test_response_200(self, session): "occupied_slots": 0, "running_slots": 0, "queued_slots": 0, + "scheduled_slots": 0, "open_slots": 3, "description": None, } == response.json @@ -289,6 +294,7 @@ def test_response_200(self): "occupied_slots": 0, "running_slots": 0, "queued_slots": 0, + "scheduled_slots": 0, "open_slots": 3, "description": "test pool", } == response.json @@ -371,6 +377,7 @@ def test_response_200(self, session): "name": "test_pool_a", "open_slots": 3, "running_slots": 0, + "scheduled_slots": 0, "slots": 3, "description": None, } == response.json @@ -465,6 +472,7 @@ def test_delete_400(self): "name": "default_pool", "open_slots": 3, "running_slots": 0, + "scheduled_slots": 0, "slots": 3, "description": "Default pool", }, @@ -480,6 +488,7 @@ def test_delete_400(self): "name": "default_pool", "open_slots": 3, "running_slots": 0, + "scheduled_slots": 0, "slots": 3, "description": "Default pool", }, @@ -498,6 +507,7 @@ def test_delete_400(self): "name": "default_pool", "open_slots": 3, "running_slots": 0, + "scheduled_slots": 0, "slots": 3, "description": "Default pool", }, @@ -553,6 +563,7 @@ def test_response_200(self, url, patch_json, expected_name, expected_slots, sess "occupied_slots": 0, "running_slots": 0, "queued_slots": 0, + "scheduled_slots": 0, "open_slots": expected_slots, "description": None, } == response.json diff --git a/tests/api_connexion/schemas/test_pool_schemas.py b/tests/api_connexion/schemas/test_pool_schemas.py index 9aecc61677e0f..5a56622b057f0 100644 --- a/tests/api_connexion/schemas/test_pool_schemas.py +++ b/tests/api_connexion/schemas/test_pool_schemas.py @@ -43,6 +43,7 @@ def test_serialize(self, session): "occupied_slots": 0, "running_slots": 0, "queued_slots": 0, + "scheduled_slots": 0, "open_slots": 2, "description": None, } @@ -73,6 +74,7 @@ def test_serialize(self): "occupied_slots": 0, "running_slots": 0, "queued_slots": 0, + "scheduled_slots": 0, "open_slots": 3, "description": None, }, @@ -82,6 +84,7 @@ def test_serialize(self): "occupied_slots": 0, "running_slots": 0, "queued_slots": 0, + "scheduled_slots": 0, "open_slots": 3, "description": None, }, diff --git a/tests/api_connexion/test_auth.py b/tests/api_connexion/test_auth.py index 4049e35007615..0aeace489be21 100644 --- a/tests/api_connexion/test_auth.py +++ b/tests/api_connexion/test_auth.py @@ -77,6 +77,7 @@ def test_success(self): "occupied_slots": 0, "running_slots": 0, "queued_slots": 0, + "scheduled_slots": 0, "open_slots": 128, "description": "Default pool", }, @@ -149,6 +150,7 @@ def test_success(self): "occupied_slots": 0, "running_slots": 0, "queued_slots": 0, + "scheduled_slots": 0, "open_slots": 128, "description": "Default pool", }, diff --git a/tests/www/views/test_views_pool.py b/tests/www/views/test_views_pool.py index 9b1c0c1d401d3..8d2a76a6ed5c1 100644 --- a/tests/www/views/test_views_pool.py +++ b/tests/www/views/test_views_pool.py @@ -84,8 +84,13 @@ def test_list(app, admin_client, pool_factory): url = flask.url_for('TaskInstanceModelView.list', _flt_3_pool='test-pool', _flt_3_state='queued') queued_tag = markupsafe.Markup("{slots}").format(url=url, slots=0) + + url = flask.url_for('TaskInstanceModelView.list', _flt_3_pool='test-pool', _flt_3_state='scheduled') + scheduled_tag = markupsafe.Markup("{slots}").format(url=url, slots=0) + check_content_in_response(used_tag, resp) check_content_in_response(queued_tag, resp) + check_content_in_response(scheduled_tag, resp) def test_pool_muldelete(session, admin_client, pool_factory):