Skip to content

Commit

Permalink
Azure: New sftp to wasb operator (#18877)
Browse files Browse the repository at this point in the history
* Azure: New sftp to wasb operator

Co-authored-by: Guilherme da Silva Goncalves <guilherme.goncalves@bancointer.com.br>
Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com>
  • Loading branch information
3 people committed Dec 22, 2021
1 parent d57ebb7 commit 341bf5a
Show file tree
Hide file tree
Showing 9 changed files with 703 additions and 2 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Expand Up @@ -683,7 +683,7 @@ dingding http
discord http
google amazon,apache.beam,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,oracle,postgres,presto,salesforce,sftp,ssh,trino
hashicorp google
microsoft.azure google,oracle
microsoft.azure google,oracle,sftp
mysql amazon,presto,trino,vertica
postgres amazon
salesforce tableau
Expand Down
3 changes: 2 additions & 1 deletion airflow/providers/dependencies.json
Expand Up @@ -59,7 +59,8 @@
],
"microsoft.azure": [
"google",
"oracle"
"oracle",
"sftp"
],
"mysql": [
"amazon",
Expand Down
@@ -0,0 +1,72 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator
from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.providers.sftp.operators.sftp import SFTPOperator

AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow")
BLOB_PREFIX = os.environ.get("AZURE_BLOB_PREFIX", "airflow")
SFTP_SRC_PATH = os.environ.get("SFTP_SRC_PATH", "/sftp")
LOCAL_FILE_PATH = os.environ.get("LOCAL_SRC_PATH", "/tmp")
SAMPLE_FILENAME = os.environ.get("SFTP_SAMPLE_FILENAME", "sftp_to_wasb_test.txt")
FILE_COMPLETE_PATH = os.path.join(LOCAL_FILE_PATH, SAMPLE_FILENAME)
SFTP_FILE_COMPLETE_PATH = os.path.join(SFTP_SRC_PATH, SAMPLE_FILENAME)


@task
def delete_sftp_file():
"""Delete a file at SFTP SERVER"""
SFTPHook().delete_file(SFTP_FILE_COMPLETE_PATH)


with DAG(
"example_sftp_to_wasb",
schedule_interval=None,
catchup=False,
start_date=datetime(2021, 1, 1), # Override to match your needs
) as dag:
transfer_files_to_sftp_step = SFTPOperator(
task_id="transfer_files_from_local_to_sftp",
local_filepath=FILE_COMPLETE_PATH,
remote_filepath=SFTP_FILE_COMPLETE_PATH,
)

# [START how_to_sftp_to_wasb]
transfer_files_to_azure = SFTPToWasbOperator(
task_id="transfer_files_from_sftp_to_wasb",
# SFTP args
sftp_source_path=SFTP_SRC_PATH,
# AZURE args
container_name=AZURE_CONTAINER_NAME,
blob_prefix=BLOB_PREFIX,
)
# [END how_to_sftp_to_wasb]

delete_blob_file_step = WasbDeleteBlobOperator(
task_id="delete_blob_files",
container_name=AZURE_CONTAINER_NAME,
blob_name=BLOB_PREFIX + SAMPLE_FILENAME,
)

transfer_files_to_sftp_step >> transfer_files_to_azure >> delete_blob_file_step >> delete_sftp_file()
4 changes: 4 additions & 0 deletions airflow/providers/microsoft/azure/provider.yaml
Expand Up @@ -179,6 +179,10 @@ transfers:
target-integration-name: Google Cloud Storage (GCS)
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/azure_blob_to_gcs.rst
python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs
- source-integration-name: SSH File Transfer Protocol (SFTP)
target-integration-name: Microsoft Azure Blob Storage
how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst
python-module: airflow.providers.microsoft.azure.transfers.sftp_to_wasb

hook-class-names: # deprecated - to be removed after providers add dependency on Airflow 2.2.0+
- airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook
Expand Down
199 changes: 199 additions & 0 deletions airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
@@ -0,0 +1,199 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains SFTP to Azure Blob Storage operator."""
import os
import sys
from collections import namedtuple
from tempfile import NamedTemporaryFile
from typing import Dict, List, Optional, Tuple

if sys.version_info >= (3, 8):
from functools import cached_property
else:
from cached_property import cached_property

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.providers.sftp.hooks.sftp import SFTPHook

WILDCARD = "*"
SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')


class SFTPToWasbOperator(BaseOperator):
"""
Transfer files to Azure Blob Storage from SFTP server.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:SFTPToWasbOperator`
:param sftp_source_path: The sftp remote path. This is the specified file path
for downloading the single file or multiple files from the SFTP server.
You can use only one wildcard within your path. The wildcard can appear
inside the path or at the end of the path.
:type sftp_source_path: str
:param container_name: Name of the container.
:type container_name: str
:param blob_prefix: Prefix to name a blob.
:type blob_prefix: str
:param sftp_conn_id: The sftp connection id. The name or identifier for
establishing a connection to the SFTP server.
:type sftp_conn_id: str
:param wasb_conn_id: Reference to the wasb connection.
:type wasb_conn_id: str
:param load_options: Optional keyword arguments that
``WasbHook.load_file()`` takes.
:type load_options: dict
:param move_object: When move object is True, the object is moved instead
of copied to the new location. This is the equivalent of a mv command
as opposed to a cp command.
:param wasb_overwrite_object: Whether the blob to be uploaded
should overwrite the current data.
When wasb_overwrite_object is True, it will overwrite the existing data.
If set to False, the operation might fail with
ResourceExistsError in case a blob object already exists.
:type move_object: bool
"""

template_fields = ("sftp_source_path", "container_name", "blob_prefix")

def __init__(
self,
*,
sftp_source_path: str,
container_name: str,
blob_prefix: str = "",
sftp_conn_id: str = "sftp_default",
wasb_conn_id: str = 'wasb_default',
load_options: Optional[Dict] = None,
move_object: bool = False,
wasb_overwrite_object: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)

self.sftp_source_path = sftp_source_path
self.blob_prefix = blob_prefix
self.sftp_conn_id = sftp_conn_id
self.wasb_conn_id = wasb_conn_id
self.container_name = container_name
self.wasb_conn_id = wasb_conn_id
self.load_options = load_options or {"overwrite": wasb_overwrite_object}
self.move_object = move_object

def dry_run(self) -> None:
super().dry_run()
sftp_files: List[SftpFile] = self.get_sftp_files_map()
for file in sftp_files:
self.log.info(
'Process will upload file from (SFTP) %s to wasb://%s as %s',
file.sftp_file_path,
self.container_name,
file.blob_name,
)
if self.move_object:
self.log.info("Executing delete of %s", file)

def execute(self, context: Dict) -> None:
"""Upload a file from SFTP to Azure Blob Storage."""
sftp_files: List[SftpFile] = self.get_sftp_files_map()
uploaded_files = self.copy_files_to_wasb(sftp_files)
if self.move_object:
self.delete_files(uploaded_files)

def get_sftp_files_map(self) -> List[SftpFile]:
"""Get SFTP files from the source path, it may use a WILDCARD to this end."""
sftp_files = []

sftp_complete_path, prefix, delimiter = self.get_tree_behavior()

found_files, _, _ = self.sftp_hook.get_tree_map(
sftp_complete_path, prefix=prefix, delimiter=delimiter
)

self.log.info("Found %s files at sftp source path: %s", str(len(found_files)), self.sftp_source_path)

for file in found_files:
future_blob_name = self.get_full_path_blob(file)
sftp_files.append(SftpFile(file, future_blob_name))

return sftp_files

def get_tree_behavior(self) -> Tuple[str, Optional[str], Optional[str]]:
"""Extracts from source path the tree behavior to interact with the remote folder"""
self.check_wildcards_limit()

if self.source_path_contains_wildcard:

prefix, delimiter = self.sftp_source_path.split(WILDCARD, 1)

sftp_complete_path = os.path.dirname(prefix)

return sftp_complete_path, prefix, delimiter

return self.sftp_source_path, None, None

def check_wildcards_limit(self) -> None:
"""Check if there are multiple wildcards used in the SFTP source path."""
total_wildcards = self.sftp_source_path.count(WILDCARD)
if total_wildcards > 1:
raise AirflowException(
"Only one wildcard '*' is allowed in sftp_source_path parameter. "
f"Found {total_wildcards} in {self.sftp_source_path}."
)

@property
def source_path_contains_wildcard(self) -> bool:
"""Checks if the SFTP source path contains a wildcard."""
return WILDCARD in self.sftp_source_path

@cached_property
def sftp_hook(self) -> SFTPHook:
"""Property of sftp hook to be re-used."""
return SFTPHook(self.sftp_conn_id)

def get_full_path_blob(self, file: str) -> str:
"""Get a blob name based on the previous name and a blob_prefix variable"""
return self.blob_prefix + os.path.basename(file)

def copy_files_to_wasb(self, sftp_files: List[SftpFile]) -> List[str]:
"""Upload a list of files from sftp_files to Azure Blob Storage with a new Blob Name."""
uploaded_files = []
wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
for file in sftp_files:
with NamedTemporaryFile("w") as tmp:
self.sftp_hook.retrieve_file(file.sftp_file_path, tmp.name)
self.log.info(
'Uploading %s to wasb://%s as %s',
file.sftp_file_path,
self.container_name,
file.blob_name,
)
wasb_hook.load_file(tmp.name, self.container_name, file.blob_name, **self.load_options)

uploaded_files.append(file.sftp_file_path)

return uploaded_files

def delete_files(self, uploaded_files: List[str]) -> None:
"""Delete files at SFTP which have been moved to Azure Blob Storage."""
for sftp_file_path in uploaded_files:
self.log.info("Executing delete of %s", sftp_file_path)
self.sftp_hook.delete_file(sftp_file_path)
@@ -0,0 +1,61 @@

.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Azure Blob Storage Transfer Operator
====================================
The Blob service stores text and binary data as objects in the cloud.
The Blob service offers the following three resources: the storage account, containers, and blobs.
Within your storage account, containers provide a way to organize sets of blobs.
For more information about the service visit `Azure Blob Storage API documentation <https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api>`_.

Before you begin
^^^^^^^^^^^^^^^^
Before using Blob Storage within Airflow you need to authenticate your account with Token, Login and Password.
Please follow Azure
`instructions <https://docs.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal>`_
to do it.

See following example.
Set values for these fields:

.. code-block::
SFTP Conn Id: sftp_default
WASB Conn Id: wasb_default
.. contents::
:depth: 1
:local:

.. _howto/operator:SFTPToWasbOperator:

Transfer Data from SFTP Source Path to Blob Storage
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Operator transfers data from SFTP Source Path to specified container in Azure Blob Storage

To get information about jobs within a Azure Blob Storage use:
:class:`~airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPToWasbOperator`
Example usage:

.. exampleinclude:: /../../airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py
:language: python
:dedent: 4
:start-after: [START how_to_sftp_to_wasb]
:end-before: [END how_to_sftp_to_wasb]

0 comments on commit 341bf5a

Please sign in to comment.