From 3305f5fc16976860686941fdbada9db6891da912 Mon Sep 17 00:00:00 2001 From: jpyen <> Date: Mon, 7 Jun 2021 23:50:37 -0700 Subject: [PATCH 1/9] Use get all DAGs, filter DAGs with permissions --- airflow/api_connexion/endpoints/dag_endpoint.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py index 5bd6f88fc3665..a1be83d06b1a3 100644 --- a/airflow/api_connexion/endpoints/dag_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_endpoint.py @@ -59,12 +59,17 @@ def get_dag_details(dag_id): @security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)]) @format_parameters({'limit': check_limit}) -def get_dags(limit, offset=0): +@provide_session +def get_dags(limit, session, offset=0): """Get all DAGs.""" + dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active) + readable_dags = current_app.appbuilder.sm.get_readable_dags(g.user) - dags = readable_dags.order_by(DagModel.dag_id).offset(offset).limit(limit).all() total_entries = readable_dags.count() + dags_query = dags_query.filter(DagModel.dag_id.in_(readable_dags)) + dags = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all() + return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries)) From c30cc0bd155c2dfe0959d7d38a43df266844eb80 Mon Sep 17 00:00:00 2001 From: jpyen <> Date: Tue, 8 Jun 2021 00:26:56 -0700 Subject: [PATCH 2/9] Tested changes --- airflow/api_connexion/endpoints/dag_endpoint.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py index a1be83d06b1a3..1e49ef61b50ef 100644 --- a/airflow/api_connexion/endpoints/dag_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_endpoint.py @@ -64,11 +64,11 @@ def get_dags(limit, session, offset=0): """Get all DAGs.""" dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active) - readable_dags = current_app.appbuilder.sm.get_readable_dags(g.user) - total_entries = readable_dags.count() + readable_dags = current_app.appbuilder.sm.get_accessible_dag_ids(g.user) dags_query = dags_query.filter(DagModel.dag_id.in_(readable_dags)) dags = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all() + total_entries = len(dags) return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries)) From bcf7d9bd14be064d073b3378940dbba4f5eb2fd0 Mon Sep 17 00:00:00 2001 From: jpyen <> Date: Tue, 8 Jun 2021 11:18:16 -0700 Subject: [PATCH 3/9] Add inactive DAG to test latest changes --- tests/api_connexion/endpoints/test_dag_endpoint.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index 031290cb8f146..fbf9b5df3d454 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -123,6 +123,15 @@ def _create_dag_models(self, count, session=None): schedule_interval="2 2 * * *", ) session.add(dag_model) + @provide_session + def _create_deactivated_dag(self, session=None): + dag_model = DagModel( + dag_id=f"TEST_DAG_DELETED_{num}", + fileloc=f"/tmp/dag_del_{num}.py", + schedule_interval="2 2 * * *", + is_active=False, + ) + session.add(dag_model) class TestGetDag(TestDagEndpoint): @@ -387,6 +396,7 @@ def test_should_raise_404_when_dag_is_not_found(self): class TestGetDags(TestDagEndpoint): def test_should_respond_200(self): self._create_dag_models(2) + self._create_deactivated_dag() response = self.client.get("api/v1/dags", environ_overrides={'REMOTE_USER': "test"}) file_token = SERIALIZER.dumps("/tmp/dag_1.py") From d312e1a89aac1554a7f08ea63d02c9f79e63a5f0 Mon Sep 17 00:00:00 2001 From: jpyen <> Date: Tue, 8 Jun 2021 11:19:38 -0700 Subject: [PATCH 4/9] Test inactive DAG --- tests/api_connexion/endpoints/test_dag_endpoint.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index fbf9b5df3d454..0667c763431d9 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -126,8 +126,8 @@ def _create_dag_models(self, count, session=None): @provide_session def _create_deactivated_dag(self, session=None): dag_model = DagModel( - dag_id=f"TEST_DAG_DELETED_{num}", - fileloc=f"/tmp/dag_del_{num}.py", + dag_id=f"TEST_DAG_DELETED_1", + fileloc=f"/tmp/dag_del_1.py", schedule_interval="2 2 * * *", is_active=False, ) From 2d12dea95fa0028526eb7677bc5a552e0dd29ad1 Mon Sep 17 00:00:00 2001 From: jpyen <> Date: Tue, 8 Jun 2021 11:20:21 -0700 Subject: [PATCH 5/9] Format code --- tests/api_connexion/endpoints/test_dag_endpoint.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index 0667c763431d9..2cbe2539c6396 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -123,6 +123,7 @@ def _create_dag_models(self, count, session=None): schedule_interval="2 2 * * *", ) session.add(dag_model) + @provide_session def _create_deactivated_dag(self, session=None): dag_model = DagModel( From dd85a7f27a89239822a051e5d48f926589bda89c Mon Sep 17 00:00:00 2001 From: jpyen <> Date: Wed, 9 Jun 2021 14:35:37 -0700 Subject: [PATCH 6/9] Test if inactive DAG was actually added to DAG_DB --- tests/api_connexion/endpoints/test_dag_endpoint.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index 2cbe2539c6396..3ccab632119e2 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -127,8 +127,8 @@ def _create_dag_models(self, count, session=None): @provide_session def _create_deactivated_dag(self, session=None): dag_model = DagModel( - dag_id=f"TEST_DAG_DELETED_1", - fileloc=f"/tmp/dag_del_1.py", + dag_id="TEST_DAG_DELETED_1", + fileloc="/tmp/dag_del_1.py", schedule_interval="2 2 * * *", is_active=False, ) @@ -395,13 +395,18 @@ def test_should_raise_404_when_dag_is_not_found(self): class TestGetDags(TestDagEndpoint): - def test_should_respond_200(self): + @provide_session + def test_should_respond_200(self, session): self._create_dag_models(2) self._create_deactivated_dag() + dags_query = session.query(DagModel).filter(~DagModel.is_subdag) + assert len(dags_query.all()) == 3 + response = self.client.get("api/v1/dags", environ_overrides={'REMOTE_USER': "test"}) file_token = SERIALIZER.dumps("/tmp/dag_1.py") file_token2 = SERIALIZER.dumps("/tmp/dag_2.py") + assert response.status_code == 200 assert { "dags": [ From 733e4e40518a75ae51300948a403469ddfcaf24a Mon Sep 17 00:00:00 2001 From: jpyen <> Date: Wed, 9 Jun 2021 15:08:51 -0700 Subject: [PATCH 7/9] Test possible ownership issues --- tests/api_connexion/endpoints/test_dag_endpoint.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index 3ccab632119e2..a5537b0c5e3a6 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -121,6 +121,7 @@ def _create_dag_models(self, count, session=None): dag_id=f"TEST_DAG_{num}", fileloc=f"/tmp/dag_{num}.py", schedule_interval="2 2 * * *", + owners['test', 'airflow'], ) session.add(dag_model) @@ -130,6 +131,7 @@ def _create_deactivated_dag(self, session=None): dag_id="TEST_DAG_DELETED_1", fileloc="/tmp/dag_del_1.py", schedule_interval="2 2 * * *", + owners['test', 'airflow'], is_active=False, ) session.add(dag_model) From 6e488508f4e680d501e50eefd70dec9211df2b5a Mon Sep 17 00:00:00 2001 From: jpyen <> Date: Wed, 9 Jun 2021 16:56:36 -0700 Subject: [PATCH 8/9] Update DAG ownership --- tests/api_connexion/endpoints/test_dag_endpoint.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index a5537b0c5e3a6..84d44ea905abf 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -121,7 +121,7 @@ def _create_dag_models(self, count, session=None): dag_id=f"TEST_DAG_{num}", fileloc=f"/tmp/dag_{num}.py", schedule_interval="2 2 * * *", - owners['test', 'airflow'], + owners=['test', 'airflow'], ) session.add(dag_model) @@ -131,7 +131,7 @@ def _create_deactivated_dag(self, session=None): dag_id="TEST_DAG_DELETED_1", fileloc="/tmp/dag_del_1.py", schedule_interval="2 2 * * *", - owners['test', 'airflow'], + owners=['test', 'airflow'], is_active=False, ) session.add(dag_model) From fbbb66f1003dd44e93d32cdcb0eb053fb546fbec Mon Sep 17 00:00:00 2001 From: jpyen <> Date: Wed, 9 Jun 2021 18:10:35 -0700 Subject: [PATCH 9/9] Fix test issues --- airflow/api_connexion/endpoints/dag_endpoint.py | 3 ++- tests/api_connexion/endpoints/test_dag_endpoint.py | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py index 1e49ef61b50ef..7b19aed56f827 100644 --- a/airflow/api_connexion/endpoints/dag_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_endpoint.py @@ -67,8 +67,9 @@ def get_dags(limit, session, offset=0): readable_dags = current_app.appbuilder.sm.get_accessible_dag_ids(g.user) dags_query = dags_query.filter(DagModel.dag_id.in_(readable_dags)) + total_entries = len(dags_query.all()) + dags = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all() - total_entries = len(dags) return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries)) diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py b/tests/api_connexion/endpoints/test_dag_endpoint.py index 84d44ea905abf..f3f750158a56d 100644 --- a/tests/api_connexion/endpoints/test_dag_endpoint.py +++ b/tests/api_connexion/endpoints/test_dag_endpoint.py @@ -121,7 +121,7 @@ def _create_dag_models(self, count, session=None): dag_id=f"TEST_DAG_{num}", fileloc=f"/tmp/dag_{num}.py", schedule_interval="2 2 * * *", - owners=['test', 'airflow'], + is_active=True, ) session.add(dag_model) @@ -131,7 +131,6 @@ def _create_deactivated_dag(self, session=None): dag_id="TEST_DAG_DELETED_1", fileloc="/tmp/dag_del_1.py", schedule_interval="2 2 * * *", - owners=['test', 'airflow'], is_active=False, ) session.add(dag_model)