From 9526a249cceb170ddfa68530fdcc786ec3e9e5c2 Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Mon, 14 Jun 2021 14:49:11 -0700 Subject: [PATCH] Adding `only_active` parameter to /dags endpoint (#14306) I noticed that the `/dags` endpoint returns information on all entries in the DAG table, which is often many more DAGs than are activeand likely includes DAGs which have been removed from Airflow. This PR adds a boolean `only_active` parameter to the `/dags` endpoint which will then only return active DAGs. I also noticed that this endpoint was hitting a deprecated codepath by dumping a `DAG` object to the DAGDetailSchema, thus hitting calling `DAG.is_paused()` I have updated the schema to call the correct function (`DAG.get_is_paused`) since I'm assuming the deprecated functions may be removed some day. --- .../api_connexion/endpoints/dag_endpoint.py | 7 +- airflow/api_connexion/openapi/v1.yaml | 11 +++ airflow/api_connexion/schemas/dag_schema.py | 13 +++ airflow/models/dag.py | 6 ++ .../endpoints/test_dag_endpoint.py | 84 +++++++++++++++++++ .../api_connexion/schemas/test_dag_schema.py | 5 ++ 6 files changed, 124 insertions(+), 2 deletions(-) diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py index 7b19aed56f827..ea4a1e35b2163 100644 --- a/airflow/api_connexion/endpoints/dag_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_endpoint.py @@ -60,9 +60,12 @@ def get_dag_details(dag_id): @security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)]) @format_parameters({'limit': check_limit}) @provide_session -def get_dags(limit, session, offset=0): +def get_dags(limit, session, offset=0, only_active=True): """Get all DAGs.""" - dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active) + if only_active: + dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active) + else: + dags_query = session.query(DagModel).filter(~DagModel.is_subdag) readable_dags = current_app.appbuilder.sm.get_accessible_dag_ids(g.user) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index dbe8b8d34267a..09d5707deb067 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -407,6 +407,13 @@ paths: - $ref: '#/components/parameters/PageLimit' - $ref: '#/components/parameters/PageOffset' - $ref: '#/components/parameters/OrderBy' + - name: only_active + in: query + schema: + type: boolean + default: true + required: false + description: Only return active DAGs. responses: '200': description: Success. @@ -1795,6 +1802,10 @@ components: type: boolean nullable: true description: Whether the DAG is paused. + is_active: + type: boolean + nullable: true + description: Whether the DAG is currently seen by the scheduler(s). is_subdag: description: Whether the DAG is SubDAG. type: boolean diff --git a/airflow/api_connexion/schemas/dag_schema.py b/airflow/api_connexion/schemas/dag_schema.py index aabd215a76ecf..bc5a9cab57c05 100644 --- a/airflow/api_connexion/schemas/dag_schema.py +++ b/airflow/api_connexion/schemas/dag_schema.py @@ -49,6 +49,7 @@ class Meta: dag_id = auto_field(dump_only=True) root_dag_id = auto_field(dump_only=True) is_paused = auto_field() + is_active = auto_field(dump_only=True) is_subdag = auto_field(dump_only=True) fileloc = auto_field(dump_only=True) file_token = fields.Method("get_token", dump_only=True) @@ -85,6 +86,8 @@ class DAGDetailSchema(DAGSchema): default_view = fields.String() params = fields.Dict() tags = fields.Method("get_tags", dump_only=True) + is_paused = fields.Method("get_is_paused", dump_only=True) + is_active = fields.Method("get_is_active", dump_only=True) @staticmethod def get_tags(obj: DAG): @@ -101,6 +104,16 @@ def get_owners(obj: DAG): return [] return obj.owner.split(",") + @staticmethod + def get_is_paused(obj: DAG): + """Checks entry in DAG table to see if this DAG is paused""" + return obj.get_is_paused() + + @staticmethod + def get_is_active(obj: DAG): + """Checks entry in DAG table to see if this DAG is active""" + return obj.get_is_active() + class DAGCollection(NamedTuple): """List of DAGs with metadata""" diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 027c3c3590a22..c0469c5cc2c87 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -802,6 +802,12 @@ def concurrency_reached(self): ) return self.get_concurrency_reached() + @provide_session + def get_is_active(self, session=None) -> Optional[None]: + """Returns a boolean indicating whether this DAG is active""" + qry = session.query(DagModel).filter(DagModel.dag_id == self.dag_id) + return qry.value(DagModel.is_active) + @provide_session def get_is_paused(self, session=None) -> Optional[None]: """Returns a boolean indicating whether this DAG is paused""" diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index f3f750158a56d..e11ad72d9da0e 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -148,6 +148,7 @@ def test_should_respond_200(self): "fileloc": "/tmp/dag_1.py", "file_token": 'Ii90bXAvZGFnXzEucHki.EnmIdPaUPo26lHQClbWMbDFD1Pk', "is_paused": False, + "is_active": True, "is_subdag": False, "owners": [], "root_dag_id": None, @@ -172,6 +173,7 @@ def test_should_respond_200_with_schedule_interval_none(self, session): "fileloc": "/tmp/dag_1.py", "file_token": 'Ii90bXAvZGFnXzEucHki.EnmIdPaUPo26lHQClbWMbDFD1Pk', "is_paused": False, + "is_active": False, "is_subdag": False, "owners": [], "root_dag_id": None, @@ -228,6 +230,7 @@ def test_should_respond_200(self): "fileloc": __file__, "file_token": FILE_TOKEN, "is_paused": None, + "is_active": None, "is_subdag": False, "orientation": "LR", "owners": ['airflow'], @@ -260,6 +263,7 @@ def test_should_response_200_with_doc_md_none(self): "fileloc": __file__, "file_token": FILE_TOKEN, "is_paused": None, + "is_active": None, "is_subdag": False, "orientation": "LR", "owners": ['airflow'], @@ -292,6 +296,7 @@ def test_should_response_200_for_null_start_date(self): "fileloc": __file__, "file_token": FILE_TOKEN, "is_paused": None, + "is_active": None, "is_subdag": False, "orientation": "LR", "owners": ['airflow'], @@ -328,6 +333,7 @@ def test_should_respond_200_serialized(self): "fileloc": __file__, "file_token": FILE_TOKEN, "is_paused": None, + "is_active": None, "is_subdag": False, "orientation": "LR", "owners": ['airflow'], @@ -366,6 +372,7 @@ def test_should_respond_200_serialized(self): 'fileloc': __file__, "file_token": FILE_TOKEN, 'is_paused': None, + "is_active": None, 'is_subdag': False, 'orientation': 'LR', 'owners': ['airflow'], @@ -417,6 +424,7 @@ def test_should_respond_200(self, session): "fileloc": "/tmp/dag_1.py", "file_token": file_token, "is_paused": False, + "is_active": True, "is_subdag": False, "owners": [], "root_dag_id": None, @@ -432,6 +440,80 @@ def test_should_respond_200(self, session): "fileloc": "/tmp/dag_2.py", "file_token": file_token2, "is_paused": False, + "is_active": True, + "is_subdag": False, + "owners": [], + "root_dag_id": None, + "schedule_interval": { + "__type": "CronExpression", + "value": "2 2 * * *", + }, + "tags": [], + }, + ], + "total_entries": 2, + } == response.json + + def test_only_active_true_returns_active_dags(self): + self._create_dag_models(1) + self._create_deactivated_dag() + response = self.client.get("api/v1/dags?only_active=True", environ_overrides={'REMOTE_USER': "test"}) + file_token = SERIALIZER.dumps("/tmp/dag_1.py") + assert response.status_code == 200 + assert { + "dags": [ + { + "dag_id": "TEST_DAG_1", + "description": None, + "fileloc": "/tmp/dag_1.py", + "file_token": file_token, + "is_paused": False, + "is_active": True, + "is_subdag": False, + "owners": [], + "root_dag_id": None, + "schedule_interval": { + "__type": "CronExpression", + "value": "2 2 * * *", + }, + "tags": [], + } + ], + "total_entries": 1, + } == response.json + + def test_only_active_false_returns_all_dags(self): + self._create_dag_models(1) + self._create_deactivated_dag() + response = self.client.get("api/v1/dags?only_active=False", environ_overrides={'REMOTE_USER': "test"}) + file_token = SERIALIZER.dumps("/tmp/dag_1.py") + file_token_2 = SERIALIZER.dumps("/tmp/dag_del_1.py") + assert response.status_code == 200 + assert { + "dags": [ + { + "dag_id": "TEST_DAG_1", + "description": None, + "fileloc": "/tmp/dag_1.py", + "file_token": file_token, + "is_paused": False, + "is_active": True, + "is_subdag": False, + "owners": [], + "root_dag_id": None, + "schedule_interval": { + "__type": "CronExpression", + "value": "2 2 * * *", + }, + "tags": [], + }, + { + "dag_id": "TEST_DAG_DELETED_1", + "description": None, + "fileloc": "/tmp/dag_del_1.py", + "file_token": file_token_2, + "is_paused": False, + "is_active": False, "is_subdag": False, "owners": [], "root_dag_id": None, @@ -538,6 +620,7 @@ def test_should_respond_200_on_patch_is_paused(self): "fileloc": "/tmp/dag_1.py", "file_token": self.file_token, "is_paused": False, + "is_active": False, "is_subdag": False, "owners": [], "root_dag_id": None, @@ -619,6 +702,7 @@ def test_should_respond_200_with_update_mask(self): "fileloc": "/tmp/dag_1.py", "file_token": self.file_token, "is_paused": False, + "is_active": False, "is_subdag": False, "owners": [], "root_dag_id": None, diff --git a/tests/api_connexion/schemas/test_dag_schema.py b/tests/api_connexion/schemas/test_dag_schema.py index 96aba1f2ec506..4fd8a5282e6e6 100644 --- a/tests/api_connexion/schemas/test_dag_schema.py +++ b/tests/api_connexion/schemas/test_dag_schema.py @@ -39,6 +39,7 @@ def test_serialize(self): dag_id="test_dag_id", root_dag_id="test_root_dag_id", is_paused=True, + is_active=True, is_subdag=False, fileloc="/root/airflow/dags/my_dag.py", owners="airflow1,airflow2", @@ -53,6 +54,7 @@ def test_serialize(self): "fileloc": "/root/airflow/dags/my_dag.py", "file_token": SERIALIZER.dumps("/root/airflow/dags/my_dag.py"), "is_paused": True, + "is_active": True, "is_subdag": False, "owners": ["airflow1", "airflow2"], "root_dag_id": "test_root_dag_id", @@ -76,6 +78,7 @@ def test_serialize(self): "file_token": SERIALIZER.dumps("/tmp/a.py"), "is_paused": None, "is_subdag": None, + "is_active": None, "owners": [], "root_dag_id": None, "schedule_interval": None, @@ -86,6 +89,7 @@ def test_serialize(self): "description": None, "fileloc": "/tmp/a.py", "file_token": SERIALIZER.dumps("/tmp/a.py"), + "is_active": None, "is_paused": None, "is_subdag": None, "owners": [], @@ -120,6 +124,7 @@ def test_serialize(self): 'doc_md': 'docs', 'fileloc': __file__, "file_token": SERIALIZER.dumps(__file__), + "is_active": None, 'is_paused': None, 'is_subdag': False, 'orientation': 'LR',