Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix API List DAGs inconsistent with UI/CLI #16318

Merged
merged 9 commits into from Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 10 additions & 4 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Expand Up @@ -59,11 +59,17 @@ def get_dag_details(dag_id):

@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG)])
@format_parameters({'limit': check_limit})
def get_dags(limit, offset=0):
@provide_session
def get_dags(limit, session, offset=0):
"""Get all DAGs."""
readable_dags = current_app.appbuilder.sm.get_readable_dags(g.user)
dags = readable_dags.order_by(DagModel.dag_id).offset(offset).limit(limit).all()
total_entries = readable_dags.count()
dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)

readable_dags = current_app.appbuilder.sm.get_accessible_dag_ids(g.user)

dags_query = dags_query.filter(DagModel.dag_id.in_(readable_dags))
total_entries = len(dags_query.all())

dags = dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit).all()

return dags_collection_schema.dump(DAGCollection(dags=dags, total_entries=total_entries))

Expand Down
19 changes: 18 additions & 1 deletion tests/api_connexion/endpoints/test_dag_endpoint.py
Expand Up @@ -121,9 +121,20 @@ def _create_dag_models(self, count, session=None):
dag_id=f"TEST_DAG_{num}",
fileloc=f"/tmp/dag_{num}.py",
schedule_interval="2 2 * * *",
is_active=True,
)
session.add(dag_model)

@provide_session
def _create_deactivated_dag(self, session=None):
dag_model = DagModel(
dag_id="TEST_DAG_DELETED_1",
fileloc="/tmp/dag_del_1.py",
schedule_interval="2 2 * * *",
is_active=False,
)
session.add(dag_model)


class TestGetDag(TestDagEndpoint):
@conf_vars({("webserver", "secret_key"): "mysecret"})
Expand Down Expand Up @@ -385,12 +396,18 @@ def test_should_raise_404_when_dag_is_not_found(self):


class TestGetDags(TestDagEndpoint):
def test_should_respond_200(self):
@provide_session
def test_should_respond_200(self, session):
self._create_dag_models(2)
self._create_deactivated_dag()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about you add a query that asserts that the number of dags in DagModel is 3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of DAGs in DagModel should exclude inactive DAGs, as far as I understand. In my opinion are inactive DAGs .trash and play no significant role.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know but how are we sure that an inactive dag exists here? Do we have a test for the new method you created? I think that querying DagModel and asserting that it has 3 dags(both active and inactive) at this point will help to know that the method you created works fine. If there's an alteration in the method in the future to work another way, it'll be noticed. What do you think?


dags_query = session.query(DagModel).filter(~DagModel.is_subdag)
assert len(dags_query.all()) == 3

response = self.client.get("api/v1/dags", environ_overrides={'REMOTE_USER': "test"})
file_token = SERIALIZER.dumps("/tmp/dag_1.py")
file_token2 = SERIALIZER.dumps("/tmp/dag_2.py")

assert response.status_code == 200
assert {
"dags": [
Expand Down