Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure: New sftp to wasb operator #18877

Merged
merged 23 commits into from Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
921e3cf
Azure: New sftp to wasb operator
Oct 10, 2021
4f08245
Removes apply_defaults decorator
Oct 11, 2021
e3ae3c1
Removes apply_defaults decorator
Oct 11, 2021
e1c2238
Uses task flow API to example_sftp_to_wasb.py
wolvery Oct 13, 2021
e9a1ee1
Fix task in example_sftp_to_wasb.py
Oct 13, 2021
a0631d2
Enrichs documentation with SFTP and WASB connection.
wolvery Dec 2, 2021
5e2677e
Changes method name to get_full_path_blob
wolvery Dec 2, 2021
c5cee86
Adds variable to expose overwrite mode.
wolvery Dec 3, 2021
241b78d
Adds f-string usage
wolvery Dec 4, 2021
4a9f17f
Fix tests
wolvery Dec 8, 2021
8c1e860
Update tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py
wolvery Dec 11, 2021
ebd8ac8
Update airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
wolvery Dec 11, 2021
276a897
Update airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
wolvery Dec 11, 2021
a2175e3
Update airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
wolvery Dec 11, 2021
224227f
Update airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
wolvery Dec 11, 2021
f8545a0
Update airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
wolvery Dec 11, 2021
dde29b1
Fix typing to Dict.
wolvery Dec 11, 2021
fe9f493
Disables catchup to example of sftp to wasb
wolvery Dec 11, 2021
ed79e8c
Fix cached property import
wolvery Dec 14, 2021
e37ad5c
Fix comment to delete_files method
wolvery Dec 17, 2021
b5ee6e8
Removes unnecessary documentation.
wolvery Dec 20, 2021
c8e87b0
Uses a better documentation
wolvery Dec 20, 2021
f1441c1
Uses a better documentation
wolvery Dec 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Expand Up @@ -683,7 +683,7 @@ dingding http
discord http
google amazon,apache.beam,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,oracle,postgres,presto,salesforce,sftp,ssh,trino
hashicorp google
microsoft.azure google,oracle
microsoft.azure google,oracle,sftp
mysql amazon,presto,trino,vertica
postgres amazon
salesforce tableau
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/dependencies.json
Expand Up @@ -59,7 +59,8 @@
],
"microsoft.azure": [
"google",
"oracle"
"oracle",
"sftp"
],
"mysql": [
"amazon",
Expand Down
@@ -0,0 +1,72 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.providers.sftp.operators.sftp import SFTPOperator

AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow")
BLOB_PREFIX = os.environ.get("AZURE_BLOB_PREFIX", "airflow")
SFTP_SRC_PATH = os.environ.get("SFTP_SRC_PATH", "/sftp")
LOCAL_FILE_PATH = os.environ.get("LOCAL_SRC_PATH", "/tmp")
SAMPLE_FILENAME = os.environ.get("SFTP_SAMPLE_FILENAME", "sftp_to_wasb_test.txt")
FILE_COMPLETE_PATH = os.path.join(LOCAL_FILE_PATH, SAMPLE_FILENAME)
SFTP_FILE_COMPLETE_PATH = os.path.join(SFTP_SRC_PATH, SAMPLE_FILENAME)


@task
def delete_sftp_file():
"""Delete a file at SFTP SERVER"""
SFTPHook().delete_file(SFTP_FILE_COMPLETE_PATH)


with DAG(
"example_sftp_to_wasb",
schedule_interval=None,
catchup=False,
start_date=datetime(2021, 1, 1), # Override to match your needs
) as dag:
transfer_files_to_sftp_step = SFTPOperator(
task_id="transfer_files_from_local_to_sftp",
local_filepath=FILE_COMPLETE_PATH,
remote_filepath=SFTP_FILE_COMPLETE_PATH,
)

# [START how_to_sftp_to_wasb]
transfer_files_to_azure = SFTPToWasbOperator(
task_id="transfer_files_from_sftp_to_wasb",
# SFTP args
sftp_source_path=SFTP_SRC_PATH,
# AZURE args
container_name=AZURE_CONTAINER_NAME,
blob_prefix=BLOB_PREFIX,
)
# [END how_to_sftp_to_wasb]

delete_blob_file_step = WasbDeleteBlobOperator(
task_id="delete_blob_files",
container_name=AZURE_CONTAINER_NAME,
blob_name=BLOB_PREFIX + SAMPLE_FILENAME,
)

transfer_files_to_sftp_step >> transfer_files_to_azure >> delete_blob_file_step >> delete_sftp_file()
4 changes: 4 additions & 0 deletions airflow/providers/microsoft/azure/provider.yaml
Expand Up @@ -179,6 +179,10 @@ transfers:
target-integration-name: Google Cloud Storage (GCS)
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/azure_blob_to_gcs.rst
python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs
- source-integration-name: SSH File Transfer Protocol (SFTP)
target-integration-name: Microsoft Azure Blob Storage
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst
python-module: airflow.providers.microsoft.azure.transfers.sftp_to_wasb

hook-class-names: # deprecated - to be removed after providers add dependency on Airflow 2.2.0+
- airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook
Expand Down
199 changes: 199 additions & 0 deletions airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
@@ -0,0 +1,199 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains SFTP to Azure Blob Storage operator."""
import os
import sys
from collections import namedtuple
from tempfile import NamedTemporaryFile
from typing import Dict, List, Optional, Tuple

if sys.version_info >= (3, 8):
from functools import cached_property
else:
from cached_property import cached_property

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.providers.sftp.hooks.sftp import SFTPHook

WILDCARD = "*"
SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')


class SFTPToWasbOperator(BaseOperator):
"""
Transfer files to Azure Blob Storage from SFTP server.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:SFTPToWasbOperator`

:param sftp_source_path: The sftp remote path. This is the specified file path
for downloading the single file or multiple files from the SFTP server.
You can use only one wildcard within your path. The wildcard can appear
inside the path or at the end of the path.
:type sftp_source_path: str
:param container_name: Name of the container.
:type container_name: str
:param blob_prefix: Prefix to name a blob.
:type blob_prefix: str
:param sftp_conn_id: The sftp connection id. The name or identifier for
establishing a connection to the SFTP server.
:type sftp_conn_id: str
:param wasb_conn_id: Reference to the wasb connection.
:type wasb_conn_id: str
:param load_options: Optional keyword arguments that
``WasbHook.load_file()`` takes.
:type load_options: dict
:param move_object: When move object is True, the object is moved instead
of copied to the new location. This is the equivalent of a mv command
as opposed to a cp command.
:param wasb_overwrite_object: Whether the blob to be uploaded
should overwrite the current data.
When wasb_overwrite_object is True, it will overwrite the existing data.
If set to False, the operation might fail with
ResourceExistsError in case a blob object already exists.
:type move_object: bool
"""

template_fields = ("sftp_source_path", "container_name", "blob_prefix")

def __init__(
self,
*,
sftp_source_path: str,
container_name: str,
blob_prefix: str = "",
sftp_conn_id: str = "sftp_default",
wasb_conn_id: str = 'wasb_default',
load_options: Optional[Dict] = None,
move_object: bool = False,
wolvery marked this conversation as resolved.
Show resolved Hide resolved
wasb_overwrite_object: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)

self.sftp_source_path = sftp_source_path
self.blob_prefix = blob_prefix
self.sftp_conn_id = sftp_conn_id
self.wasb_conn_id = wasb_conn_id
self.container_name = container_name
self.wasb_conn_id = wasb_conn_id
self.load_options = load_options or {"overwrite": wasb_overwrite_object}
self.move_object = move_object

def dry_run(self) -> None:
super().dry_run()
sftp_files: List[SftpFile] = self.get_sftp_files_map()
for file in sftp_files:
self.log.info(
'Process will upload file from (SFTP) %s to wasb://%s as %s',
file.sftp_file_path,
self.container_name,
file.blob_name,
)
if self.move_object:
self.log.info("Executing delete of %s", file)

def execute(self, context: Dict) -> None:
"""Upload a file from SFTP to Azure Blob Storage."""
sftp_files: List[SftpFile] = self.get_sftp_files_map()
uploaded_files = self.copy_files_to_wasb(sftp_files)
if self.move_object:
self.delete_files(uploaded_files)

def get_sftp_files_map(self) -> List[SftpFile]:
"""Get SFTP files from the source path, it may use a WILDCARD to this end."""
sftp_files = []

sftp_complete_path, prefix, delimiter = self.get_tree_behavior()

found_files, _, _ = self.sftp_hook.get_tree_map(
sftp_complete_path, prefix=prefix, delimiter=delimiter
)

self.log.info("Found %s files at sftp source path: %s", str(len(found_files)), self.sftp_source_path)

for file in found_files:
future_blob_name = self.get_full_path_blob(file)
sftp_files.append(SftpFile(file, future_blob_name))

return sftp_files

def get_tree_behavior(self) -> Tuple[str, Optional[str], Optional[str]]:
"""Extracts from source path the tree behavior to interact with the remote folder"""
self.check_wildcards_limit()

if self.source_path_contains_wildcard:

prefix, delimiter = self.sftp_source_path.split(WILDCARD, 1)

sftp_complete_path = os.path.dirname(prefix)

return sftp_complete_path, prefix, delimiter

return self.sftp_source_path, None, None

def check_wildcards_limit(self) -> None:
"""Check if there are multiple wildcards used in the SFTP source path."""
total_wildcards = self.sftp_source_path.count(WILDCARD)
if total_wildcards > 1:
raise AirflowException(
"Only one wildcard '*' is allowed in sftp_source_path parameter. "
f"Found {total_wildcards} in {self.sftp_source_path}."
)

@property
def source_path_contains_wildcard(self) -> bool:
"""Checks if the SFTP source path contains a wildcard."""
return WILDCARD in self.sftp_source_path

@cached_property
def sftp_hook(self) -> SFTPHook:
"""Property of sftp hook to be re-used."""
return SFTPHook(self.sftp_conn_id)

def get_full_path_blob(self, file: str) -> str:
"""Get a blob name based on the previous name and a blob_prefix variable"""
return self.blob_prefix + os.path.basename(file)

def copy_files_to_wasb(self, sftp_files: List[SftpFile]) -> List[str]:
"""Upload a list of files from sftp_files to Azure Blob Storage with a new Blob Name."""
uploaded_files = []
wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
for file in sftp_files:
with NamedTemporaryFile("w") as tmp:
self.sftp_hook.retrieve_file(file.sftp_file_path, tmp.name)
self.log.info(
'Uploading %s to wasb://%s as %s',
file.sftp_file_path,
self.container_name,
file.blob_name,
)
wasb_hook.load_file(tmp.name, self.container_name, file.blob_name, **self.load_options)

uploaded_files.append(file.sftp_file_path)

return uploaded_files

def delete_files(self, uploaded_files: List[str]) -> None:
"""Delete files at SFTP which have been moved to Azure Blob Storage."""
for sftp_file_path in uploaded_files:
self.log.info("Executing delete of %s", sftp_file_path)
self.sftp_hook.delete_file(sftp_file_path)
@@ -0,0 +1,61 @@

.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.



Azure Blob Storage Transfer Operator
====================================
The Blob service stores text and binary data as objects in the cloud.
The Blob service offers the following three resources: the storage account, containers, and blobs.
Within your storage account, containers provide a way to organize sets of blobs.
For more information about the service visit `Azure Blob Storage API documentation <https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api>`_.

Before you begin
^^^^^^^^^^^^^^^^
Before using Blob Storage within Airflow you need to authenticate your account with Token, Login and Password.
Please follow Azure
`instructions <https://docs.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal>`_
to do it.

See following example.
Set values for these fields:

.. code-block::

SFTP Conn Id: sftp_default
WASB Conn Id: wasb_default

.. contents::
:depth: 1
:local:

.. _howto/operator:SFTPToWasbOperator:

Transfer Data from SFTP Source Path to Blob Storage
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Operator transfers data from SFTP Source Path to specified container in Azure Blob Storage

To get information about jobs within a Azure Blob Storage use:
:class:`~airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPToWasbOperator`
Example usage:

.. exampleinclude:: /../../airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
:language: python
:dedent: 4
:start-after: [START how_to_sftp_to_wasb]
:end-before: [END how_to_sftp_to_wasb]