Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding only_active parameter to /dags endpoint #14306

Merged
merged 10 commits into from Jun 14, 2021
7 changes: 5 additions & 2 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Expand Up @@ -60,9 +60,12 @@ def get_dag_details(dag_id):
@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
@format_parameters({'limit': check_limit})
@provide_session
def get_dags(limit, session, offset=0):
def get_dags(limit, session, offset=0, only_active=True):
"""Get all DAGs."""
dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
if only_active:
dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
else:
dags_query = session.query(DagModel).filter(~DagModel.is_subdag)

readable_dags = current_app.appbuilder.sm.get_accessible_dag_ids(g.user)

Expand Down
11 changes: 11 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Expand Up @@ -407,6 +407,13 @@ paths:
- $ref: '#/components/parameters/PageLimit'
- $ref: '#/components/parameters/PageOffset'
- $ref: '#/components/parameters/OrderBy'
- name: only_active
in: query
schema:
SamWheating marked this conversation as resolved.
Show resolved Hide resolved
type: boolean
default: true
SamWheating marked this conversation as resolved.
Show resolved Hide resolved
required: false
description: Only return active DAGs.
responses:
'200':
description: Success.
Expand Down Expand Up @@ -1795,6 +1802,10 @@ components:
type: boolean
nullable: true
description: Whether the DAG is paused.
is_active:
type: boolean
nullable: true
description: Whether the DAG is currently seen by the scheduler(s).
is_subdag:
description: Whether the DAG is SubDAG.
type: boolean
Expand Down
13 changes: 13 additions & 0 deletions airflow/api_connexion/schemas/dag_schema.py
Expand Up @@ -49,6 +49,7 @@ class Meta:
dag_id = auto_field(dump_only=True)
root_dag_id = auto_field(dump_only=True)
is_paused = auto_field()
is_active = auto_field(dump_only=True)
is_subdag = auto_field(dump_only=True)
fileloc = auto_field(dump_only=True)
file_token = fields.Method("get_token", dump_only=True)
Expand Down Expand Up @@ -85,6 +86,8 @@ class DAGDetailSchema(DAGSchema):
default_view = fields.String()
params = fields.Dict()
tags = fields.Method("get_tags", dump_only=True)
is_paused = fields.Method("get_is_paused", dump_only=True)
is_active = fields.Method("get_is_active", dump_only=True)

@staticmethod
def get_tags(obj: DAG):
Expand All @@ -101,6 +104,16 @@ def get_owners(obj: DAG):
return []
return obj.owner.split(",")

@staticmethod
def get_is_paused(obj: DAG):
SamWheating marked this conversation as resolved.
Show resolved Hide resolved
"""Checks entry in DAG table to see if this DAG is paused"""
return obj.get_is_paused()

@staticmethod
def get_is_active(obj: DAG):
"""Checks entry in DAG table to see if this DAG is active"""
return obj.get_is_active()
SamWheating marked this conversation as resolved.
Show resolved Hide resolved


class DAGCollection(NamedTuple):
"""List of DAGs with metadata"""
Expand Down
6 changes: 6 additions & 0 deletions airflow/models/dag.py
Expand Up @@ -802,6 +802,12 @@ def concurrency_reached(self):
)
return self.get_concurrency_reached()

@provide_session
def get_is_active(self, session=None) -> Optional[None]:
"""Returns a boolean indicating whether this DAG is active"""
qry = session.query(DagModel).filter(DagModel.dag_id == self.dag_id)
return qry.value(DagModel.is_active)

SamWheating marked this conversation as resolved.
Show resolved Hide resolved
@provide_session
def get_is_paused(self, session=None) -> Optional[None]:
"""Returns a boolean indicating whether this DAG is paused"""
Expand Down
84 changes: 84 additions & 0 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Expand Up @@ -148,6 +148,7 @@ def test_should_respond_200(self):
"fileloc": "/tmp/dag_1.py",
"file_token": 'Ii90bXAvZGFnXzEucHki.EnmIdPaUPo26lHQClbWMbDFD1Pk',
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand All @@ -172,6 +173,7 @@ def test_should_respond_200_with_schedule_interval_none(self, session):
"fileloc": "/tmp/dag_1.py",
"file_token": 'Ii90bXAvZGFnXzEucHki.EnmIdPaUPo26lHQClbWMbDFD1Pk',
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand Down Expand Up @@ -228,6 +230,7 @@ def test_should_respond_200(self):
"fileloc": __file__,
"file_token": FILE_TOKEN,
"is_paused": None,
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ['airflow'],
Expand Down Expand Up @@ -260,6 +263,7 @@ def test_should_response_200_with_doc_md_none(self):
"fileloc": __file__,
"file_token": FILE_TOKEN,
"is_paused": None,
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ['airflow'],
Expand Down Expand Up @@ -292,6 +296,7 @@ def test_should_response_200_for_null_start_date(self):
"fileloc": __file__,
"file_token": FILE_TOKEN,
"is_paused": None,
"is_active": None,
SamWheating marked this conversation as resolved.
Show resolved Hide resolved
"is_subdag": False,
"orientation": "LR",
"owners": ['airflow'],
Expand Down Expand Up @@ -328,6 +333,7 @@ def test_should_respond_200_serialized(self):
"fileloc": __file__,
"file_token": FILE_TOKEN,
"is_paused": None,
"is_active": None,
"is_subdag": False,
"orientation": "LR",
"owners": ['airflow'],
Expand Down Expand Up @@ -366,6 +372,7 @@ def test_should_respond_200_serialized(self):
'fileloc': __file__,
"file_token": FILE_TOKEN,
'is_paused': None,
"is_active": None,
'is_subdag': False,
'orientation': 'LR',
'owners': ['airflow'],
Expand Down Expand Up @@ -417,6 +424,7 @@ def test_should_respond_200(self, session):
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand All @@ -432,6 +440,80 @@ def test_should_respond_200(self, session):
"fileloc": "/tmp/dag_2.py",
"file_token": file_token2,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
},
],
"total_entries": 2,
} == response.json

def test_only_active_true_returns_active_dags(self):
self._create_dag_models(1)
self._create_deactivated_dag()
response = self.client.get("api/v1/dags?only_active=True", environ_overrides={'REMOTE_USER': "test"})
file_token = SERIALIZER.dumps("/tmp/dag_1.py")
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
}
],
"total_entries": 1,
} == response.json

def test_only_active_false_returns_all_dags(self):
self._create_dag_models(1)
self._create_deactivated_dag()
response = self.client.get("api/v1/dags?only_active=False", environ_overrides={'REMOTE_USER': "test"})
file_token = SERIALIZER.dumps("/tmp/dag_1.py")
file_token_2 = SERIALIZER.dumps("/tmp/dag_del_1.py")
assert response.status_code == 200
assert {
"dags": [
{
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"file_token": file_token,
"is_paused": False,
"is_active": True,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
"schedule_interval": {
"__type": "CronExpression",
"value": "2 2 * * *",
},
"tags": [],
},
{
"dag_id": "TEST_DAG_DELETED_1",
"description": None,
"fileloc": "/tmp/dag_del_1.py",
"file_token": file_token_2,
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand Down Expand Up @@ -538,6 +620,7 @@ def test_should_respond_200_on_patch_is_paused(self):
"fileloc": "/tmp/dag_1.py",
"file_token": self.file_token,
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand Down Expand Up @@ -619,6 +702,7 @@ def test_should_respond_200_with_update_mask(self):
"fileloc": "/tmp/dag_1.py",
"file_token": self.file_token,
"is_paused": False,
"is_active": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand Down
5 changes: 5 additions & 0 deletions tests/api_connexion/schemas/test_dag_schema.py
Expand Up @@ -39,6 +39,7 @@ def test_serialize(self):
dag_id="test_dag_id",
root_dag_id="test_root_dag_id",
is_paused=True,
is_active=True,
is_subdag=False,
fileloc="/root/airflow/dags/my_dag.py",
owners="airflow1,airflow2",
Expand All @@ -53,6 +54,7 @@ def test_serialize(self):
"fileloc": "/root/airflow/dags/my_dag.py",
"file_token": SERIALIZER.dumps("/root/airflow/dags/my_dag.py"),
"is_paused": True,
"is_active": True,
"is_subdag": False,
"owners": ["airflow1", "airflow2"],
"root_dag_id": "test_root_dag_id",
Expand All @@ -76,6 +78,7 @@ def test_serialize(self):
"file_token": SERIALIZER.dumps("/tmp/a.py"),
"is_paused": None,
"is_subdag": None,
"is_active": None,
"owners": [],
"root_dag_id": None,
"schedule_interval": None,
Expand All @@ -86,6 +89,7 @@ def test_serialize(self):
"description": None,
"fileloc": "/tmp/a.py",
"file_token": SERIALIZER.dumps("/tmp/a.py"),
"is_active": None,
"is_paused": None,
"is_subdag": None,
"owners": [],
Expand Down Expand Up @@ -120,6 +124,7 @@ def test_serialize(self):
'doc_md': 'docs',
'fileloc': __file__,
"file_token": SERIALIZER.dumps(__file__),
"is_active": None,
'is_paused': None,
'is_subdag': False,
'orientation': 'LR',
Expand Down