Skip to content

Commit

Permalink
Add DataFlow operations to Azure DataFactory hook (#26345)
Browse files Browse the repository at this point in the history
  • Loading branch information
phanikumv committed Sep 19, 2022
1 parent 8cb0377 commit 24d88e8
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 1 deletion.
113 changes: 112 additions & 1 deletion airflow/providers/microsoft/azure/hooks/data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
PipelineRun
TriggerResource
datafactory
DataFlow
mgmt
"""
from __future__ import annotations
Expand All @@ -39,6 +40,7 @@
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import (
CreateRunResponse,
DataFlow,
DatasetResource,
Factory,
LinkedServiceResource,
Expand Down Expand Up @@ -479,12 +481,121 @@ def delete_dataset(
Delete the dataset.
:param dataset_name: The dataset name.
:param resource_group_name: The dataset name.
:param resource_group_name: The resource group name.
:param factory_name: The factory name.
:param config: Extra parameters for the ADF client.
"""
self.get_conn().datasets.delete(resource_group_name, factory_name, dataset_name, **config)

@provide_targeted_factory
def get_dataflow(
self,
dataflow_name: str,
resource_group_name: str | None = None,
factory_name: str | None = None,
**config: Any,
) -> DataFlow:
"""
Get the dataflow.
:param dataflow_name: The dataflow name.
:param resource_group_name: The resource group name.
:param factory_name: The factory name.
:param config: Extra parameters for the ADF client.
:return: The dataflow.
"""
return self.get_conn().data_flows.get(resource_group_name, factory_name, dataflow_name, **config)

def _dataflow_exists(
self,
dataflow_name: str,
resource_group_name: str | None = None,
factory_name: str | None = None,
) -> bool:
"""Return whether the dataflow already exists."""
dataflows = {
dataflow.name
for dataflow in self.get_conn().data_flows.list_by_factory(resource_group_name, factory_name)
}

return dataflow_name in dataflows

@provide_targeted_factory
def update_dataflow(
self,
dataflow_name: str,
dataflow: DataFlow,
resource_group_name: str | None = None,
factory_name: str | None = None,
**config: Any,
) -> DataFlow:
"""
Update the dataflow.
:param dataflow_name: The dataflow name.
:param dataflow: The dataflow resource definition.
:param resource_group_name: The resource group name.
:param factory_name: The factory name.
:param config: Extra parameters for the ADF client.
:raise AirflowException: If the dataset does not exist.
:return: The dataflow.
"""
if not self._dataflow_exists(
dataflow_name,
resource_group_name,
factory_name,
):
raise AirflowException(f"Dataflow {dataflow_name!r} does not exist.")

return self.get_conn().data_flows.create_or_update(
resource_group_name, factory_name, dataflow_name, dataflow, **config
)

@provide_targeted_factory
def create_dataflow(
self,
dataflow_name: str,
dataflow: DataFlow,
resource_group_name: str | None = None,
factory_name: str | None = None,
**config: Any,
) -> DataFlow:
"""
Create the dataflow.
:param dataflow_name: The dataflow name.
:param dataflow: The dataflow resource definition.
:param resource_group_name: The resource group name.
:param factory_name: The factory name.
:param config: Extra parameters for the ADF client.
:raise AirflowException: If the dataset already exists.
:return: The dataset.
"""
if self._dataflow_exists(dataflow_name, resource_group_name, factory_name):
raise AirflowException(f"Dataflow {dataflow_name!r} already exists.")

return self.get_conn().data_flows.create_or_update(
resource_group_name, factory_name, dataflow_name, dataflow, **config
)

@provide_targeted_factory
def delete_dataflow(
self,
dataflow_name: str,
resource_group_name: str | None = None,
factory_name: str | None = None,
**config: Any,
) -> None:
"""
Delete the dataflow.
:param dataflow_name: The dataflow name.
:param resource_group_name: The resource group name.
:param factory_name: The factory name.
:param config: Extra parameters for the ADF client.
"""
self.get_conn().data_flows.delete(resource_group_name, factory_name, dataflow_name, **config)

@provide_targeted_factory
def get_pipeline(
self,
Expand Down
62 changes: 62 additions & 0 deletions tests/providers/microsoft/azure/hooks/test_azure_data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def hook():
"pipeline_runs",
"triggers",
"trigger_runs",
"data_flows",
]
)

Expand Down Expand Up @@ -342,6 +343,67 @@ def test_delete_dataset(hook: AzureDataFactoryHook, user_args, sdk_args):
hook._conn.datasets.delete.assert_called_with(*sdk_args)


@parametrize(
explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
)
def test_get_dataflow(hook: AzureDataFactoryHook, user_args, sdk_args):
hook.get_dataflow(*user_args)

hook._conn.data_flows.get.assert_called_with(*sdk_args)


@parametrize(
explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
)
def test_create_dataflow(hook: AzureDataFactoryHook, user_args, sdk_args):
hook.create_dataflow(*user_args)

hook._conn.data_flows.create_or_update.assert_called_with(*sdk_args)


@parametrize(
explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
)
def test_update_dataflow(hook: AzureDataFactoryHook, user_args, sdk_args):
with patch.object(hook, "_dataflow_exists") as mock_dataflow_exists:
mock_dataflow_exists.return_value = True
hook.update_dataflow(*user_args)

hook._conn.data_flows.create_or_update.assert_called_with(*sdk_args)


@parametrize(
explicit_factory=((NAME, MODEL, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME, MODEL)),
implicit_factory=((NAME, MODEL), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME, MODEL)),
)
def test_update_dataflow_non_existent(hook: AzureDataFactoryHook, user_args, sdk_args):
with patch.object(hook, "_dataflow_exists") as mock_dataflow_exists:
mock_dataflow_exists.return_value = False

with pytest.raises(AirflowException, match=r"Dataflow .+ does not exist"):
hook.update_dataflow(*user_args)


@parametrize(
explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
implicit_factory=(
(NAME,),
(
DEFAULT_RESOURCE_GROUP,
DEFAULT_FACTORY,
NAME,
),
),
)
def test_delete_dataflow(hook: AzureDataFactoryHook, user_args, sdk_args):
hook.delete_dataflow(*user_args)

hook._conn.data_flows.delete.assert_called_with(*sdk_args)


@parametrize(
explicit_factory=((NAME, RESOURCE_GROUP, FACTORY), (RESOURCE_GROUP, FACTORY, NAME)),
implicit_factory=((NAME,), (DEFAULT_RESOURCE_GROUP, DEFAULT_FACTORY, NAME)),
Expand Down

0 comments on commit 24d88e8

Please sign in to comment.