Skip to content

Commit

Permalink
Adding only_active parameter to /dags endpoint (#14306)
Browse files Browse the repository at this point in the history
I noticed that the `/dags` endpoint returns information on all entries in the DAG table, which is often many more DAGs than are activeand likely includes DAGs which have been removed from Airflow. 

This PR adds a boolean `only_active` parameter to the `/dags` endpoint which will then only return active DAGs. 

I also noticed that this endpoint was hitting a deprecated codepath by dumping a `DAG` object to the DAGDetailSchema, thus hitting calling `DAG.is_paused()` I have updated the schema to call the correct function (`DAG.get_is_paused`) since I'm assuming the deprecated functions may be removed some day.
  • Loading branch information
SamWheating committed Jun 14, 2021
1 parent c6e043f commit 9526a24
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 2 deletions.
7 changes: 5 additions & 2 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Expand Up @@ -60,9 +60,12 @@ def get_dag_details(dag_id):
@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
@format_parameters({'limit': check_limit})
@provide_session
def get_dags(limit, session, offset=0):
def get_dags(limit, session, offset=0, only_active=True):
"""Get all DAGs."""
dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
if only_active:
dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
else:
dags_query = session.query(DagModel).filter(~DagModel.is_subdag)

readable_dags = current_app.appbuilder.sm.get_accessible_dag_ids(g.user)

Expand Down
11 changes: 11 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Expand Up @@ -407,6 +407,13 @@ paths:
- $ref: '#/components/parameters/PageLimit'
- $ref: '#/components/parameters/PageOffset'
- $ref: '#/components/parameters/OrderBy'
- name: only_active
in: query
schema:
type: boolean
default: true
required: false
description: Only return active DAGs.
responses:
'200':
description: Success.
Expand Down Expand Up @@ -1795,6 +1802,10 @@ components:
type: boolean
nullable: true
description: Whether the DAG is paused.
is_active:
type: boolean
nullable: true
description: Whether the DAG is currently seen by the scheduler(s).
is_subdag:
description: Whether the DAG is SubDAG.
type: boolean
Expand Down
13 changes: 13 additions & 0 deletions airflow/api_connexion/schemas/dag_schema.py
Expand Up @@ -49,6 +49,7 @@ class Meta:
dag_id = auto_field(dump_only=True)
root_dag_id = auto_field(dump_only=True)
is_paused = auto_field()
is_active = auto_field(dump_only=True)
is_subdag = auto_field(dump_only=True)
fileloc = auto_field(dump_only=True)
file_token = fields.Method("get_token", dump_only=True)
Expand Down Expand Up @@ -85,6 +86,8 @@ class DAGDetailSchema(DAGSchema):
default_view = fields.String()
params = fields.Dict()
tags = fields.Method("get_tags", dump_only=True)
is_paused = fields.Method("get_is_paused", dump_only=True)
is_active = fields.Method("get_is_active", dump_only=True)

@staticmethod
def get_tags(obj: DAG):
Expand All @@ -101,6 +104,16 @@ def get_owners(obj: DAG):
return []
return obj.owner.split(",")

@staticmethod
def get_is_paused(obj: DAG):
"""Checks entry in DAG table to see if this DAG is paused"""
return obj.get_is_paused()

@staticmethod
def get_is_active(obj: DAG):
"""Checks entry in DAG table to see if this DAG is active"""
return obj.get_is_active()


class DAGCollection(NamedTuple):
"""List of DAGs with metadata"""
Expand Down
6 changes: 6 additions & 0 deletions airflow/models/dag.py
Expand Up @@ -802,6 +802,12 @@ def concurrency_reached(self):
)
return self.get_concurrency_reached()

@provide_session
def get_is_active(self, session=None) -> Optional[None]:
"""Returns a boolean indicating whether this DAG is active"""
qry = session.query(DagModel).filter(DagModel.dag_id == self.dag_id)
return qry.value(DagModel.is_active)

@provide_session
def get_is_paused(self, session=None) -> Optional[None]:
"""Returns a boolean indicating whether this DAG is paused"""
Expand Down
84 changes: 84 additions & 0 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Expand Up @@ -148,6 +148,7 @@ def test_should_respond_200(self):
"fileloc": "/tmp/dag_1.py",
"file_token": 'Ii90bXAvZGFnXzEucHki.EnmIdPaUPo26lHQClbWMbDFD1Pk',
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand All @@ -172,6 +173,7 @@ def test_should_respond_200_with_schedule_interval_none(self, session):
"fileloc": "/tmp/dag_1.py",
"file_token": 'Ii90bXAvZGFnXzEucHki.EnmIdPaUPo26lHQClbWMbDFD1Pk',
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand Down Expand Up @@ -228,6 +230,7 @@ def test_should_respond_200(self):
"fileloc": __file__,
"file_token": FILE_TOKEN,
"is_paused": None,
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ['airflow'],
Expand Down Expand Up @@ -260,6 +263,7 @@ def test_should_response_200_with_doc_md_none(self):
"fileloc": __file__,
"file_token": FILE_TOKEN,
"is_paused": None,
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ['airflow'],
Expand Down Expand Up @@ -292,6 +296,7 @@ def test_should_response_200_for_null_start_date(self):
"fileloc": __file__,
"file_token": FILE_TOKEN,
"is_paused": None,
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ['airflow'],
Expand Down Expand Up @@ -328,6 +333,7 @@ def test_should_respond_200_serialized(self):
"fileloc": __file__,
"file_token": FILE_TOKEN,
"is_paused": None,
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ['airflow'],
Expand Down Expand Up @@ -366,6 +372,7 @@ def test_should_respond_200_serialized(self):
'fileloc': __file__,
"file_token": FILE_TOKEN,
'is_paused': None,
"is_active": None,
'is_subdag': False,
'orientation': 'LR',
'owners': ['airflow'],
Expand Down Expand Up @@ -417,6 +424,7 @@ def test_should_respond_200(self, session):
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand All @@ -432,6 +440,80 @@ def test_should_respond_200(self, session):
"fileloc": "/tmp/dag_2.py",
"file_token": file_token2,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
},
],
"total_entries": 2,
} == response.json

def test_only_active_true_returns_active_dags(self):
self._create_dag_models(1)
self._create_deactivated_dag()
response = self.client.get("api/v1/dags?only_active=True", environ_overrides={'REMOTE_USER': "test"})
file_token = SERIALIZER.dumps("/tmp/dag_1.py")
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
}
],
"total_entries": 1,
} == response.json

def test_only_active_false_returns_all_dags(self):
self._create_dag_models(1)
self._create_deactivated_dag()
response = self.client.get("api/v1/dags?only_active=False", environ_overrides={'REMOTE_USER': "test"})
file_token = SERIALIZER.dumps("/tmp/dag_1.py")
file_token_2 = SERIALIZER.dumps("/tmp/dag_del_1.py")
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
},
{
"dag_id": "TEST_DAG_DELETED_1",
"description": None,
"fileloc": "/tmp/dag_del_1.py",
"file_token": file_token_2,
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand Down Expand Up @@ -538,6 +620,7 @@ def test_should_respond_200_on_patch_is_paused(self):
"fileloc": "/tmp/dag_1.py",
"file_token": self.file_token,
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand Down Expand Up @@ -619,6 +702,7 @@ def test_should_respond_200_with_update_mask(self):
"fileloc": "/tmp/dag_1.py",
"file_token": self.file_token,
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand Down
5 changes: 5 additions & 0 deletions tests/api_connexion/schemas/test_dag_schema.py
Expand Up @@ -39,6 +39,7 @@ def test_serialize(self):
dag_id="test_dag_id",
root_dag_id="test_root_dag_id",
is_paused=True,
is_active=True,
is_subdag=False,
fileloc="/root/airflow/dags/my_dag.py",
owners="airflow1,airflow2",
Expand All @@ -53,6 +54,7 @@ def test_serialize(self):
"fileloc": "/root/airflow/dags/my_dag.py",
"file_token": SERIALIZER.dumps("/root/airflow/dags/my_dag.py"),
"is_paused": True,
"is_active": True,
"is_subdag": False,
"owners": ["airflow1", "airflow2"],
"root_dag_id": "test_root_dag_id",
Expand All @@ -76,6 +78,7 @@ def test_serialize(self):
"file_token": SERIALIZER.dumps("/tmp/a.py"),
"is_paused": None,
"is_subdag": None,
"is_active": None,
"owners": [],
"root_dag_id": None,
"schedule_interval": None,
Expand All @@ -86,6 +89,7 @@ def test_serialize(self):
"description": None,
"fileloc": "/tmp/a.py",
"file_token": SERIALIZER.dumps("/tmp/a.py"),
"is_active": None,
"is_paused": None,
"is_subdag": None,
"owners": [],
Expand Down Expand Up @@ -120,6 +124,7 @@ def test_serialize(self):
'doc_md': 'docs',
'fileloc': __file__,
"file_token": SERIALIZER.dumps(__file__),
"is_active": None,
'is_paused': None,
'is_subdag': False,
'orientation': 'LR',
Expand Down

0 comments on commit 9526a24

Please sign in to comment.