Skip to content

Commit

Permalink
Add dataset events to dataset api (#25039)
Browse files Browse the repository at this point in the history
  • Loading branch information
jedcunningham committed Jul 15, 2022
1 parent 0d747fa commit fcf8cc2
Show file tree
Hide file tree
Showing 6 changed files with 497 additions and 35 deletions.
45 changes: 44 additions & 1 deletion airflow/api_connexion/endpoints/dataset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,23 @@
# specific language governing permissions and limitations
# under the License.

from typing import Optional

from sqlalchemy import func
from sqlalchemy.orm import Session

from airflow import Dataset
from airflow.api_connexion import security
from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters
from airflow.api_connexion.schemas.dataset_schema import (
DatasetCollection,
DatasetEventCollection,
dataset_collection_schema,
dataset_event_collection_schema,
dataset_schema,
)
from airflow.api_connexion.types import APIResponse
from airflow.models.dataset import Dataset, DatasetEvent
from airflow.security import permissions
from airflow.utils.session import NEW_SESSION, provide_session

Expand Down Expand Up @@ -59,3 +63,42 @@ def get_datasets(
query = apply_sorting(query, order_by, {}, allowed_attrs)
datasets = query.offset(offset).limit(limit).all()
return dataset_collection_schema.dump(DatasetCollection(datasets=datasets, total_entries=total_entries))


@security.requires_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET)])
@provide_session
@format_parameters({'limit': check_limit})
def get_dataset_events(
*,
limit: int,
offset: int = 0,
order_by: str = "created_at",
dataset_id: Optional[int] = None,
source_dag_id: Optional[str] = None,
source_task_id: Optional[str] = None,
source_run_id: Optional[str] = None,
source_map_index: Optional[int] = None,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get dataset events"""
allowed_attrs = ['source_dag_id', 'source_task_id', 'source_run_id', 'source_map_index', 'created_at']

query = session.query(DatasetEvent)

if dataset_id:
query = query.filter(DatasetEvent.dataset_id == dataset_id)
if source_dag_id:
query = query.filter(DatasetEvent.source_dag_id == source_dag_id)
if source_task_id:
query = query.filter(DatasetEvent.source_task_id == source_task_id)
if source_run_id:
query = query.filter(DatasetEvent.source_run_id == source_run_id)
if source_map_index:
query = query.filter(DatasetEvent.source_map_index == source_map_index)

total_entries = query.count()
query = apply_sorting(query, order_by, {}, allowed_attrs)
events = query.offset(offset).limit(limit).all()
return dataset_event_collection_schema.dump(
DatasetEventCollection(dataset_events=events, total_entries=total_entries)
)
116 changes: 116 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,36 @@ paths:
'404':
$ref: '#/components/responses/NotFound'

/datasets/events:
parameters:
- $ref: '#/components/parameters/PageLimit'
- $ref: '#/components/parameters/PageOffset'
- $ref: '#/components/parameters/OrderBy'
- $ref: '#/components/parameters/FilterDatasetID'
- $ref: '#/components/parameters/FilterSourceDAGID'
- $ref: '#/components/parameters/FilterSourceTaskID'
- $ref: '#/components/parameters/FilterSourceRunID'
- $ref: '#/components/parameters/FilterSourceMapIndex'
get:
summary: Get dataset events
description: Get dataset events
x-openapi-router-controller: airflow.api_connexion.endpoints.dataset_endpoint
operationId: get_dataset_events
tags: [Dataset]
responses:
'200':
description: Success.
content:
application/json:
schema:
$ref: '#/components/schemas/DatasetEventCollection'
'401':
$ref: '#/components/responses/Unauthenticated'
'403':
$ref: '#/components/responses/PermissionDenied'
'404':
$ref: '#/components/responses/NotFound'

/config:
get:
summary: Get current configuration
Expand Down Expand Up @@ -3461,6 +3491,57 @@ components:
$ref: '#/components/schemas/Dataset'
- $ref: '#/components/schemas/CollectionInfo'

DatasetEvent:
description: |
A dataset event.
*New in version 2.4.0*
type: object
properties:
dataset_id:
type: integer
description: The dataset id
extra:
type: string
description: The dataset extra
nullable: true
source_dag_id:
type: string
description: The DAG ID that updated the dataset.
nullable: false
source_task_id:
type: string
description: The task ID that updated the dataset.
nullable: false
source_run_id:
type: string
description: The DAG run ID that updated the dataset.
nullable: false
source_map_index:
type: integer
description: The task map index that updated the dataset.
nullable: false
created_at:
type: string
description: The dataset event creation time
nullable: false


DatasetEventCollection:
description: |
A collection of dataset events.
*New in version 2.4.0*
type: object
allOf:
- type: object
properties:
dataset_events:
type: array
items:
$ref: '#/components/schemas/DatasetEvent'
- $ref: '#/components/schemas/CollectionInfo'


# Configuration
ConfigOption:
Expand Down Expand Up @@ -4287,6 +4368,41 @@ components:
*New in version 2.2.0*
FilterDatasetID:
in: query
name: dataset_id
schema:
type: integer
description: The Dataset ID that updated the dataset.

FilterSourceDAGID:
in: query
name: source_dag_id
schema:
type: string
description: The DAG ID that updated the dataset.

FilterSourceTaskID:
in: query
name: source_task_id
schema:
type: string
description: The task ID that updated the dataset.

FilterSourceRunID:
in: query
name: source_run_id
schema:
type: string
description: The DAG run ID that updated the dataset.

FilterSourceMapIndex:
in: query
name: source_map_index
schema:
type: integer
description: The map index that updated the dataset.

OrderBy:
in: query
name: order_by
Expand Down
38 changes: 37 additions & 1 deletion airflow/api_connexion/schemas/dataset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from marshmallow import Schema, fields
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field

from airflow import Dataset
from airflow.models.dataset import Dataset, DatasetEvent


class DatasetSchema(SQLAlchemySchema):
Expand Down Expand Up @@ -54,3 +54,39 @@ class DatasetCollectionSchema(Schema):

dataset_schema = DatasetSchema()
dataset_collection_schema = DatasetCollectionSchema()


class DatasetEventSchema(SQLAlchemySchema):
"""Dataset Event DB schema"""

class Meta:
"""Meta"""

model = DatasetEvent

id = auto_field()
dataset_id = auto_field()
extra = auto_field()
source_task_id = auto_field()
source_dag_id = auto_field()
source_run_id = auto_field()
source_map_index = auto_field()
created_at = auto_field()


class DatasetEventCollection(NamedTuple):
"""List of Dataset events with meta"""

dataset_events: List[DatasetEvent]
total_entries: int


class DatasetEventCollectionSchema(Schema):
"""Dataset Event Collection Schema"""

dataset_events = fields.List(fields.Nested(DatasetEventSchema))
total_entries = fields.Int()


dataset_event_schema = DatasetEventSchema()
dataset_event_collection_schema = DatasetEventCollectionSchema()

0 comments on commit fcf8cc2

Please sign in to comment.