From 921e3cf11448be6b26b74ce090848c4303e95cf5 Mon Sep 17 00:00:00 2001 From: Guilherme da Silva Goncalves Date: Sun, 10 Oct 2021 20:01:46 -0300 Subject: [PATCH 01/23] Azure: New sftp to wasb operator --- CONTRIBUTING.rst | 2 +- airflow/providers/dependencies.json | 3 +- .../example_dags/example_sftp_to_wasb.py | 77 ++++++ .../providers/microsoft/azure/provider.yaml | 4 + .../microsoft/azure/transfers/sftp_to_wasb.py | 194 ++++++++++++++ .../operators/sftp_to_wasb.rst | 67 +++++ .../azure/transfers/test_sftp_to_wasb.py | 253 ++++++++++++++++++ .../transfers/test_sftp_to_wasb_system.py | 57 ++++ tests/test_utils/sftp_system_helpers.py | 51 ++++ 9 files changed, 706 insertions(+), 2 deletions(-) create mode 100644 airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py create mode 100644 airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py create mode 100644 docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst create mode 100644 tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py create mode 100644 tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py create mode 100644 tests/test_utils/sftp_system_helpers.py diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 3ebb0fc9e9965..bb844eb86dd64 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -683,7 +683,7 @@ dingding http discord http google amazon,apache.beam,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,oracle,postgres,presto,salesforce,sftp,ssh,trino hashicorp google -microsoft.azure google,oracle +microsoft.azure google,oracle,sftp mysql amazon,presto,trino,vertica postgres amazon salesforce tableau diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json index ec082c1bb537b..3c511465a7c43 100644 --- a/airflow/providers/dependencies.json +++ b/airflow/providers/dependencies.json @@ -59,7 +59,8 @@ ], "microsoft.azure": [ "google", - "oracle" + "oracle", + "sftp" ], "mysql": [ "amazon", diff --git a/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py new file mode 100644 index 0000000000000..32d1b96abd2cc --- /dev/null +++ b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator +from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator +from airflow.providers.sftp.hooks.sftp import SFTPHook +from airflow.providers.sftp.operators.sftp import SFTPOperator +from airflow.utils.dates import days_ago + +AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow") +BLOB_PREFIX = os.environ.get("AZURE_BLOB_PREFIX", "airflow") +SFTP_SRC_PATH = os.environ.get("SFTP_SRC_PATH", "/sftp") +LOCAL_FILE_PATH = os.environ.get("LOCAL_SRC_PATH", "/tmp") +SAMPLE_FILENAME = os.environ.get("SFTP_SAMPLE_FILENAME", "sftp_to_wasb_test.txt") +FILE_COMPLETE_PATH = os.path.join(LOCAL_FILE_PATH, SAMPLE_FILENAME) +SFTP_FILE_COMPLETE_PATH = os.path.join(SFTP_SRC_PATH, SAMPLE_FILENAME) + + +def delete_sftp_file(): + """Delete a file at SFTP SERVER""" + SFTPHook(ssh_conn_id="sftp_default").delete_file(SFTP_FILE_COMPLETE_PATH) + + +with DAG( + "example_sftp_to_wasb", + schedule_interval=None, + start_date=days_ago(1), # Override to match your needs +) as dag: + + transfer_files_to_sftp_step = SFTPOperator( + task_id="transfer_files_from_local_to_sftp", + ssh_conn_id="sftp_default", + local_filepath=FILE_COMPLETE_PATH, + remote_filepath=SFTP_FILE_COMPLETE_PATH, + ) + + # [START how_to_sftp_to_wasb] + transfer_files_to_azure = SFTPToWasbOperator( + task_id="transfer_files_from_sftp_to_wasb", + # SFTP args + sftp_conn_id="sftp_default", + sftp_source_path=SFTP_SRC_PATH, + # AZURE args + wasb_conn_id="wasb_default", + container_name=AZURE_CONTAINER_NAME, + blob_prefix=BLOB_PREFIX, + ) + # [END how_to_sftp_to_wasb] + + delete_blob_file_step = WasbDeleteBlobOperator( + task_id="delete_blob_files", + wasb_conn_id="wasb_default", + container_name=AZURE_CONTAINER_NAME, + blob_name=BLOB_PREFIX + SAMPLE_FILENAME, + ) + + delete_sftp_step = PythonOperator(task_id="delete_sftp_file", python_callable=delete_sftp_file) + + transfer_files_to_sftp_step >> transfer_files_to_azure >> delete_blob_file_step >> delete_sftp_step diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index 8c67bbf3f5ed9..97e98909b62bc 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -179,6 +179,10 @@ transfers: target-integration-name: Google Cloud Storage (GCS) how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/azure_blob_to_gcs.rst python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs + - source-integration-name: SSH File Transfer Protocol (SFTP) + target-integration-name: Microsoft Azure Blob Storage + how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst + python-module: airflow.providers.microsoft.azure.transfers.sftp_to_wasb hook-class-names: # deprecated - to be removed after providers add dependency on Airflow 2.2.0+ - airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py new file mode 100644 index 0000000000000..78917e57af372 --- /dev/null +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -0,0 +1,194 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains SFTP to Azure Blob Storage operator.""" +import os +from collections import namedtuple +from tempfile import NamedTemporaryFile +from typing import Any, Dict, List, Optional, Tuple + +try: + from functools import cached_property +except ImportError: + from cached_property import cached_property + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.microsoft.azure.hooks.wasb import WasbHook +from airflow.providers.sftp.hooks.sftp import SFTPHook +from airflow.utils.decorators import apply_defaults + +WILDCARD = "*" +SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name') + + +class SFTPToWasbOperator(BaseOperator): + """ + Transfer files to Azure Blob Storage from SFTP server. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:SFTPToWasbOperator` + + :param sftp_source_path: The sftp remote path. This is the specified file path + for downloading the single file or multiple files from the SFTP server. + You can use only one wildcard within your path. The wildcard can appear + inside the path or at the end of the path. + :type sftp_source_path: str + :param container_name: Name of the container. + :type container_name: str + :param blob_prefix: Prefix to name a blob. + :type blob_prefix: str + :param sftp_conn_id: The sftp connection id. The name or identifier for + establishing a connection to the SFTP server. + :type sftp_conn_id: str + :param wasb_conn_id: Reference to the wasb connection. + :type wasb_conn_id: str + :param load_options: Optional keyword arguments that + ``WasbHook.load_file()`` takes. + :type load_options: dict + :param move_object: When move object is True, the object is moved instead + of copied to the new location. This is the equivalent of a mv command + as opposed to a cp command. + :type move_object: bool + """ + + template_fields = ("sftp_source_path", "container_name", "blob_prefix") + + @apply_defaults + def __init__( + self, + *, + sftp_source_path: str, + container_name: str, + blob_prefix: str = "", + sftp_conn_id: str = "sftp_default", + wasb_conn_id: str = 'wasb_default', + load_options: Optional[dict] = None, + move_object: bool = False, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.sftp_source_path = sftp_source_path + self.blob_prefix = blob_prefix + self.sftp_conn_id = sftp_conn_id + self.wasb_conn_id = wasb_conn_id + self.container_name = container_name + self.wasb_conn_id = wasb_conn_id + self.load_options = load_options or {} + self.move_object = move_object + + def dry_run(self) -> None: + super().dry_run() + sftp_files: List[SftpFile] = self.get_sftp_files_map() + for file in sftp_files: + self.log.info( + 'Process will upload file from (SFTP) %s to wasb://%s as %s', + file.sftp_file_path, + self.container_name, + file.blob_name, + ) + if self.move_object: + self.log.info("Executing delete of %s", file) + + def execute(self, context: Optional[Dict[Any, Any]]) -> None: + """Upload a file from SFTP to Azure Blob Storage.""" + sftp_files: List[SftpFile] = self.get_sftp_files_map() + uploaded_files = self.copy_files_to_wasb(sftp_files) + if self.move_object: + self.delete_files(uploaded_files) + + def get_sftp_files_map(self) -> List[SftpFile]: + """Get SFTP files from the source path, it may use a WILDCARD to this end.""" + sftp_files = [] + + sftp_complete_path, prefix, delimiter = self.get_tree_behavior() + + found_files, _, _ = self.sftp_hook.get_tree_map( + sftp_complete_path, prefix=prefix, delimiter=delimiter + ) + + self.log.info("Found %s files at sftp source path: %s", str(len(found_files)), self.sftp_source_path) + + for file in found_files: + future_blob_name = self.get_future_blob_name(file) + sftp_files.append(SftpFile(file, future_blob_name)) + + return sftp_files + + def get_tree_behavior(self) -> Tuple[str, Optional[str], Optional[str]]: + """Extracts from source path the tree behavior to interact with the remote folder""" + self.check_wildcards_limit() + + if self.source_path_contains_wildcard: + + prefix, delimiter = self.sftp_source_path.split(WILDCARD, 1) + + sftp_complete_path = os.path.dirname(prefix) + + return sftp_complete_path, prefix, delimiter + + return self.sftp_source_path, None, None + + def check_wildcards_limit(self) -> Any: + """Check if there is multiple Wildcard.""" + total_wildcards = self.sftp_source_path.count(WILDCARD) + if total_wildcards > 1: + raise AirflowException( + "Only one wildcard '*' is allowed in sftp_source_path parameter. " + "Found {} in {}.".format(total_wildcards, self.sftp_source_path) + ) + + @property + def source_path_contains_wildcard(self) -> bool: + """Does source path contains a wildcard""" + return WILDCARD in self.sftp_source_path + + @cached_property + def sftp_hook(self) -> SFTPHook: + """Property of sftp hook to be re-used.""" + return SFTPHook(self.sftp_conn_id) + + def get_future_blob_name(self, file: str) -> str: + """Get a blob name based by the previous name and a blob_prefix variable""" + return self.blob_prefix + os.path.basename(file) + + def copy_files_to_wasb(self, sftp_files: List[SftpFile]) -> List[str]: + """Upload a list of files from sftp_files to Azure Blob Storage with a new Blob Name.""" + uploaded_files = [] + wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id) + for file in sftp_files: + with NamedTemporaryFile("w") as tmp: + self.sftp_hook.retrieve_file(file.sftp_file_path, tmp.name) + self.log.info( + 'Uploading %s to wasb://%s as %s', + file.sftp_file_path, + self.container_name, + file.blob_name, + ) + wasb_hook.load_file(tmp.name, self.container_name, file.blob_name, **self.load_options) + + uploaded_files.append(file.sftp_file_path) + + return uploaded_files + + def delete_files(self, uploaded_files: List[str]) -> None: + """Performs a move of a list of files at SFTP to Azure Blob Storage.""" + for sftp_file_path in uploaded_files: + self.log.info("Executing delete of %s", sftp_file_path) + self.sftp_hook.delete_file(sftp_file_path) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst new file mode 100644 index 0000000000000..d0d6486676760 --- /dev/null +++ b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst @@ -0,0 +1,67 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +Azure Blob Storage Transfer Operator +==================================== +The Blob service stores text and binary data as objects in the cloud. +The Blob service offers the following three resources: the storage account, containers, and blobs. +Within your storage account, containers provide a way to organize sets of blobs. +For more information about the service visit `Azure Blob Storage API documentation `_. + +Before you begin +^^^^^^^^^^^^^^^^ +Before using Blob Storage within Airflow you need to authenticate your account with Token, Login and Password. +Please follow Azure +`instructions `_ +to do it. + +TOKEN should be added to the Connection in Airflow in JSON format, Login and Password as plain text. +You can check `how to do such connection `_. + +See following example. +Set values for these fields: + +.. code-block:: + + SFTP Conn Id: sftp_default + WASB Conn Id: wasb_default + Login: Storage Account Name + Password: KEY1 + Extra: {"sas_token": "TOKEN"} + +.. contents:: + :depth: 1 + :local: + +.. _howto/operator:SFTPToWasbOperator: + +Transfer Data from SFTP Source Path to Blob Storage +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Operator transfers data from SFTP Source Path to specified container in Azure Blob Storage + +To get information about jobs within a Azure Blob Storage use: +:class:`~airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPToWasbOperator` +Example usage: + +.. exampleinclude:: /../../airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py + :language: python + :dedent: 4 + :start-after: [START how_to_sftp_to_wasb] + :end-before: [END how_to_sftp_to_wasb] diff --git a/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py new file mode 100644 index 0000000000000..dfd5f7f01ccb3 --- /dev/null +++ b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py @@ -0,0 +1,253 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +from unittest import mock + +from airflow import AirflowException +from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SftpFile, SFTPToWasbOperator + +TASK_ID = "test-gcs-to-sftp-operator" +WASB_CONN_ID = "wasb_default" +SFTP_CONN_ID = "ssh_default" + +CONTAINER_NAME = "test-container" +WILDCARD_PATH = "main_dir/*" +WILDCARD_FILE_NAME = "main_dir/test_object*.json" +SOURCE_PATH_NO_WILDCARD = "main_dir/" +SOURCE_OBJECT_MULTIPLE_WILDCARDS = "main_dir/csv/*/test_*.csv" +BLOB_PREFIX = "sponge-bob" +EXPECTED_BLOB_NAME = "test_object3.json" +EXPECTED_FILES = [SOURCE_PATH_NO_WILDCARD + EXPECTED_BLOB_NAME] + + +# pylint: disable=unused-argument +class TestSFTPToWasbOperator(unittest.TestCase): + def test_init(self): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + blob_prefix=BLOB_PREFIX, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + assert operator.sftp_source_path == SOURCE_PATH_NO_WILDCARD + assert operator.sftp_conn_id == SFTP_CONN_ID + assert operator.container_name == CONTAINER_NAME + assert operator.wasb_conn_id == WASB_CONN_ID + assert operator.blob_prefix == BLOB_PREFIX + + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook', autospec=True) + def test_execute_more_than_one_wildcard_exception(self, mock_hook): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_OBJECT_MULTIPLE_WILDCARDS, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + blob_prefix=BLOB_PREFIX, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + with self.assertRaises(AirflowException) as cm: + operator.check_wildcards_limit() + + err = cm.exception + assert "Only one wildcard '*' is allowed" in str(err) + + def test_get_sftp_tree_behavior(self): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=WILDCARD_PATH, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + sftp_complete_path, prefix, delimiter = operator.get_tree_behavior() + + assert sftp_complete_path == 'main_dir', "not matched at expected complete path" + assert prefix == 'main_dir/', "Prefix must be EQUAL TO wildcard" + assert delimiter == "", "Delimiter must be empty" + + def test_get_sftp_tree_behavior_without_wildcard(self): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + sftp_complete_path, prefix, delimiter = operator.get_tree_behavior() + + assert sftp_complete_path == 'main_dir/', "not matched at expected complete path" + assert prefix is None, "Prefix must be NONE when no wildcard" + assert delimiter is None, "Delimiter must be none" + + def test_source_path_contains_wildcard(self): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=WILDCARD_PATH, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + output = operator.source_path_contains_wildcard + assert output is True, "This path contains a wildpath" + + def test_source_path_not_contains_wildcard(self): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + output = operator.source_path_contains_wildcard + assert output is False, "This path does not contains a wildpath" + + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook') + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook') + def test_get_sftp_files_map_no_wildcard(self, sftp_hook, mock_hook): + sftp_hook.return_value.get_tree_map.return_value = [ + EXPECTED_FILES, + [], + [], + ] + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=True, + ) + files = operator.get_sftp_files_map() + + assert len(files) == 1, "no matched at expected found files" + assert files[0].blob_name == EXPECTED_BLOB_NAME, "expected blob name not matched" + + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook') + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook') + def test_copy_files_to_wasb(self, sftp_hook, mock_hook): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=True, + ) + + sftp_files = [SftpFile(EXPECTED_FILES[0], EXPECTED_BLOB_NAME)] + files = operator.copy_files_to_wasb(sftp_files) + + operator.sftp_hook.retrieve_file.assert_has_calls([mock.call("main_dir/test_object3.json", mock.ANY)]) + + mock_hook.return_value.load_file.assert_called_once_with(mock.ANY, CONTAINER_NAME, EXPECTED_BLOB_NAME) + + assert len(files) == 1, "no matched at expected uploaded files" + + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook') + def test_delete_files(self, sftp_hook): + sftp_mock = sftp_hook.return_value + + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=True, + ) + + sftp_file_paths = EXPECTED_FILES + operator.delete_files(sftp_file_paths) + + sftp_mock.delete_file.assert_has_calls([mock.call(EXPECTED_FILES[0])]) + + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook') + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook') + def test_execute(self, sftp_hook, mock_hook): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=WILDCARD_FILE_NAME, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + + sftp_hook.return_value.get_tree_map.return_value = [ + ["main_dir/test_object.json"], + [], + [], + ] + + operator.execute(None) + + sftp_hook.return_value.get_tree_map.assert_called_with( + "main_dir", prefix="main_dir/test_object", delimiter=".json" + ) + + sftp_hook.return_value.retrieve_file.assert_has_calls( + [mock.call("main_dir/test_object.json", mock.ANY)] + ) + + mock_hook.return_value.load_file.assert_called_once_with(mock.ANY, CONTAINER_NAME, "test_object.json") + + sftp_hook.return_value.delete_file.assert_not_called() + + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook') + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook') + def test_execute_moved_files(self, sftp_hook, mock_hook): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=WILDCARD_FILE_NAME, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=True, + blob_prefix=BLOB_PREFIX, + ) + + sftp_hook.return_value.get_tree_map.return_value = [ + ["main_dir/test_object.json"], + [], + [], + ] + + operator.execute(None) + + sftp_hook.return_value.get_tree_map.assert_called_with( + "main_dir", prefix="main_dir/test_object", delimiter=".json" + ) + + sftp_hook.return_value.retrieve_file.assert_has_calls( + [mock.call("main_dir/test_object.json", mock.ANY)] + ) + + mock_hook.return_value.load_file.assert_called_once_with( + mock.ANY, CONTAINER_NAME, BLOB_PREFIX + "test_object.json" + ) + assert sftp_hook.return_value.delete_file.called is True, "File must be moved" diff --git a/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py new file mode 100644 index 0000000000000..600f84fc33585 --- /dev/null +++ b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import os + +import pytest + +from airflow.providers.microsoft.azure.example_dags.example_sftp_to_wasb import ( + FILE_COMPLETE_PATH, + LOCAL_FILE_PATH, + SAMPLE_FILENAME, +) +from tests.test_utils.azure_system_helpers import ( + AZURE_DAG_FOLDER, + AzureSystemTest, + provide_wasb_default_connection, +) +from tests.test_utils.sftp_system_helpers import provide_sftp_default_connection + +CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys') +SFTP_DEFAULT_KEY = 'sftp_key.json' +WASB_DEFAULT_KEY = 'wasb_key.json' +CREDENTIALS_SFTP_PATH = os.path.join(CREDENTIALS_DIR, SFTP_DEFAULT_KEY) +CREDENTIALS_WASB_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY) + + +@pytest.mark.backend('postgres', 'mysql') +@pytest.mark.credential_file(WASB_DEFAULT_KEY) +@pytest.mark.credential_file(SFTP_DEFAULT_KEY) +class TestSFTPToWasbSystem(AzureSystemTest): + def setUp(self): + super().setUp() + self.create_dummy_file(SAMPLE_FILENAME, LOCAL_FILE_PATH) + + def tearDown(self): + os.remove(FILE_COMPLETE_PATH) + super().tearDown() + + @provide_wasb_default_connection(CREDENTIALS_WASB_PATH) + @provide_sftp_default_connection(CREDENTIALS_SFTP_PATH) + def test_run_example_file_to_wasb(self): + self.run_dag('example_sftp_to_wasb', AZURE_DAG_FOLDER) diff --git a/tests/test_utils/sftp_system_helpers.py b/tests/test_utils/sftp_system_helpers.py new file mode 100644 index 0000000000000..accf3c9504591 --- /dev/null +++ b/tests/test_utils/sftp_system_helpers.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +import os +from contextlib import contextmanager + +from airflow.exceptions import AirflowException +from airflow.models import Connection +from airflow.utils.process_utils import patch_environ + +SFTP_CONNECTION_ID = os.environ.get("SFTP_CONNECTION_ID", "sftp_default") + + +@contextmanager +def provide_sftp_default_connection(key_file_path: str): + """ + Context manager to provide a temporary value for sftp_default connection + + :param key_file_path: Path to file with sftp_default credentials .json file. + :type key_file_path: str + """ + if not key_file_path.endswith(".json"): + raise AirflowException("Use a JSON key file.") + with open(key_file_path) as credentials: + creds = json.load(credentials) + conn = Connection( + conn_id=SFTP_CONNECTION_ID, + conn_type="ssh", + port=creds.get("port", None), + host=creds.get("host", None), + login=creds.get("login", None), + password=creds.get("password", None), + extra=json.dumps(creds.get('extra', None)), + ) + with patch_environ({f"AIRFLOW_CONN_{conn.conn_id.upper()}": conn.get_uri()}): + yield From 4f0824535ad89010d98b638045727b45f838b163 Mon Sep 17 00:00:00 2001 From: Guilherme da Silva Goncalves Date: Sun, 10 Oct 2021 22:18:18 -0300 Subject: [PATCH 02/23] Removes apply_defaults decorator --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 78917e57af372..93f0a9b6468f3 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -69,7 +69,6 @@ class SFTPToWasbOperator(BaseOperator): template_fields = ("sftp_source_path", "container_name", "blob_prefix") - @apply_defaults def __init__( self, *, From e3ae3c14b695374a2103afe3013e9c2e607ef8af Mon Sep 17 00:00:00 2001 From: Guilherme da Silva Goncalves Date: Mon, 11 Oct 2021 09:30:15 -0300 Subject: [PATCH 03/23] Removes apply_defaults decorator --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 93f0a9b6468f3..5424ebded7940 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -30,7 +30,6 @@ from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.wasb import WasbHook from airflow.providers.sftp.hooks.sftp import SFTPHook -from airflow.utils.decorators import apply_defaults WILDCARD = "*" SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name') From e1c2238584901f3c76b3a0c94273fb464018ea29 Mon Sep 17 00:00:00 2001 From: wolvery Date: Tue, 12 Oct 2021 23:03:57 -0300 Subject: [PATCH 04/23] Uses task flow API to example_sftp_to_wasb.py --- .../azure/example_dags/example_sftp_to_wasb.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py index 32d1b96abd2cc..b348253e58f10 100644 --- a/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py @@ -16,14 +16,15 @@ # specific language governing permissions and limitations # under the License. import os +from datetime import datetime from airflow import DAG +from airflow.decorators import task from airflow.operators.python import PythonOperator from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator from airflow.providers.sftp.hooks.sftp import SFTPHook from airflow.providers.sftp.operators.sftp import SFTPOperator -from airflow.utils.dates import days_ago AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow") BLOB_PREFIX = os.environ.get("AZURE_BLOB_PREFIX", "airflow") @@ -34,20 +35,19 @@ SFTP_FILE_COMPLETE_PATH = os.path.join(SFTP_SRC_PATH, SAMPLE_FILENAME) +@task def delete_sftp_file(): """Delete a file at SFTP SERVER""" - SFTPHook(ssh_conn_id="sftp_default").delete_file(SFTP_FILE_COMPLETE_PATH) + SFTPHook().delete_file(SFTP_FILE_COMPLETE_PATH) with DAG( "example_sftp_to_wasb", schedule_interval=None, - start_date=days_ago(1), # Override to match your needs + start_date=datetime(2021, 1, 1), # Override to match your needs ) as dag: - transfer_files_to_sftp_step = SFTPOperator( task_id="transfer_files_from_local_to_sftp", - ssh_conn_id="sftp_default", local_filepath=FILE_COMPLETE_PATH, remote_filepath=SFTP_FILE_COMPLETE_PATH, ) @@ -56,10 +56,8 @@ def delete_sftp_file(): transfer_files_to_azure = SFTPToWasbOperator( task_id="transfer_files_from_sftp_to_wasb", # SFTP args - sftp_conn_id="sftp_default", sftp_source_path=SFTP_SRC_PATH, # AZURE args - wasb_conn_id="wasb_default", container_name=AZURE_CONTAINER_NAME, blob_prefix=BLOB_PREFIX, ) @@ -67,11 +65,8 @@ def delete_sftp_file(): delete_blob_file_step = WasbDeleteBlobOperator( task_id="delete_blob_files", - wasb_conn_id="wasb_default", container_name=AZURE_CONTAINER_NAME, blob_name=BLOB_PREFIX + SAMPLE_FILENAME, ) - delete_sftp_step = PythonOperator(task_id="delete_sftp_file", python_callable=delete_sftp_file) - - transfer_files_to_sftp_step >> transfer_files_to_azure >> delete_blob_file_step >> delete_sftp_step + transfer_files_to_sftp_step >> transfer_files_to_azure >> delete_blob_file_step >> delete_sftp_file From e9a1ee1c780f6413ca650b839696b25733939171 Mon Sep 17 00:00:00 2001 From: Guilherme da Silva Goncalves Date: Wed, 13 Oct 2021 10:32:17 -0300 Subject: [PATCH 05/23] Fix task in example_sftp_to_wasb.py --- .../microsoft/azure/example_dags/example_sftp_to_wasb.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py index b348253e58f10..a4229ce77b8d8 100644 --- a/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py @@ -20,7 +20,6 @@ from airflow import DAG from airflow.decorators import task -from airflow.operators.python import PythonOperator from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator from airflow.providers.sftp.hooks.sftp import SFTPHook @@ -69,4 +68,4 @@ def delete_sftp_file(): blob_name=BLOB_PREFIX + SAMPLE_FILENAME, ) - transfer_files_to_sftp_step >> transfer_files_to_azure >> delete_blob_file_step >> delete_sftp_file + transfer_files_to_sftp_step >> transfer_files_to_azure >> delete_blob_file_step >> delete_sftp_file() From a0631d2de7b2e04a956e53b656c220585d9e1bf5 Mon Sep 17 00:00:00 2001 From: wolvery Date: Thu, 2 Dec 2021 00:18:27 -0300 Subject: [PATCH 06/23] Enrichs documentation with SFTP and WASB connection. --- .../operators/sftp_to_wasb.rst | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst index d0d6486676760..e4842f273604d 100644 --- a/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst +++ b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst @@ -32,8 +32,9 @@ Please follow Azure `instructions `_ to do it. -TOKEN should be added to the Connection in Airflow in JSON format, Login and Password as plain text. -You can check `how to do such connection `_. +You can check `https://airflow.apache.org/docs/apache-airflow-providers-microsoft-azure/stable/connections/wasb.htmlhow to do such connection `_. +Please, check `WASB `_ and `SFTP `_ +for setting up connection fields. See following example. Set values for these fields: @@ -42,9 +43,6 @@ Set values for these fields: SFTP Conn Id: sftp_default WASB Conn Id: wasb_default - Login: Storage Account Name - Password: KEY1 - Extra: {"sas_token": "TOKEN"} .. contents:: :depth: 1 From 5e2677e4668b339eb49dab1d37924c8986aca053 Mon Sep 17 00:00:00 2001 From: wolvery Date: Thu, 2 Dec 2021 00:28:56 -0300 Subject: [PATCH 07/23] Changes method name to get_full_path_blob --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 5424ebded7940..58350d56346e4 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -124,7 +124,7 @@ def get_sftp_files_map(self) -> List[SftpFile]: self.log.info("Found %s files at sftp source path: %s", str(len(found_files)), self.sftp_source_path) for file in found_files: - future_blob_name = self.get_future_blob_name(file) + future_blob_name = self.get_full_path_blob(file) sftp_files.append(SftpFile(file, future_blob_name)) return sftp_files @@ -162,7 +162,7 @@ def sftp_hook(self) -> SFTPHook: """Property of sftp hook to be re-used.""" return SFTPHook(self.sftp_conn_id) - def get_future_blob_name(self, file: str) -> str: + def get_full_path_blob(self, file: str) -> str: """Get a blob name based by the previous name and a blob_prefix variable""" return self.blob_prefix + os.path.basename(file) From c5cee86a05073c23ecb071d84f8ab27fade3fe90 Mon Sep 17 00:00:00 2001 From: wolvery Date: Thu, 2 Dec 2021 21:17:22 -0300 Subject: [PATCH 08/23] Adds variable to expose overwrite mode. --- .../providers/microsoft/azure/transfers/sftp_to_wasb.py | 8 +++++++- .../operators/sftp_to_wasb.rst | 3 ++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 58350d56346e4..3b5d886105817 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -63,6 +63,11 @@ class SFTPToWasbOperator(BaseOperator): :param move_object: When move object is True, the object is moved instead of copied to the new location. This is the equivalent of a mv command as opposed to a cp command. + :param wasb_overwrite_object: Whether the blob to be uploaded + should overwrite the current data. + When wasb_overwrite_object is True, it will overwrite the existing data. + If set to False, the operation might fail with + ResourceExistsError in case a blob object already exists. :type move_object: bool """ @@ -78,6 +83,7 @@ def __init__( wasb_conn_id: str = 'wasb_default', load_options: Optional[dict] = None, move_object: bool = False, + wasb_overwrite_object: bool = False, **kwargs, ) -> None: super().__init__(**kwargs) @@ -88,7 +94,7 @@ def __init__( self.wasb_conn_id = wasb_conn_id self.container_name = container_name self.wasb_conn_id = wasb_conn_id - self.load_options = load_options or {} + self.load_options = load_options or {"overwrite": wasb_overwrite_object} self.move_object = move_object def dry_run(self) -> None: diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst index e4842f273604d..e8cf37da13f47 100644 --- a/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst +++ b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst @@ -33,7 +33,8 @@ Please follow Azure to do it. You can check `https://airflow.apache.org/docs/apache-airflow-providers-microsoft-azure/stable/connections/wasb.htmlhow to do such connection `_. -Please, check `WASB `_ and `SFTP `_ +Please, check `WASB `_ +and `SFTP `_ for setting up connection fields. See following example. From 241b78d6b5b9114a3225a2a82bf55c49127e247d Mon Sep 17 00:00:00 2001 From: wolvery Date: Sat, 4 Dec 2021 20:35:55 -0300 Subject: [PATCH 09/23] Adds f-string usage --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 3b5d886105817..46b13e85004f4 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -155,7 +155,7 @@ def check_wildcards_limit(self) -> Any: if total_wildcards > 1: raise AirflowException( "Only one wildcard '*' is allowed in sftp_source_path parameter. " - "Found {} in {}.".format(total_wildcards, self.sftp_source_path) + f"Found {total_wildcards} in {self.sftp_source_path}." ) @property From 4a9f17f70839c7c0d1af787a171491c789f78d1d Mon Sep 17 00:00:00 2001 From: wolvery Date: Wed, 8 Dec 2021 11:19:30 -0300 Subject: [PATCH 10/23] Fix tests Fix tests Fix docs. --- .../operators/sftp_to_wasb.rst | 3 +-- .../microsoft/azure/transfers/test_sftp_to_wasb.py | 10 +++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst index e8cf37da13f47..486e2076db6c5 100644 --- a/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst +++ b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst @@ -32,10 +32,9 @@ Please follow Azure `instructions `_ to do it. -You can check `https://airflow.apache.org/docs/apache-airflow-providers-microsoft-azure/stable/connections/wasb.htmlhow to do such connection `_. Please, check `WASB `_ and `SFTP `_ -for setting up connection fields. +for how to do such `connection `_. See following example. Set values for these fields: diff --git a/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py index dfd5f7f01ccb3..87935f9050174 100644 --- a/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py +++ b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py @@ -164,7 +164,9 @@ def test_copy_files_to_wasb(self, sftp_hook, mock_hook): operator.sftp_hook.retrieve_file.assert_has_calls([mock.call("main_dir/test_object3.json", mock.ANY)]) - mock_hook.return_value.load_file.assert_called_once_with(mock.ANY, CONTAINER_NAME, EXPECTED_BLOB_NAME) + mock_hook.return_value.load_file.assert_called_once_with( + mock.ANY, CONTAINER_NAME, EXPECTED_BLOB_NAME, overwrite=False + ) assert len(files) == 1, "no matched at expected uploaded files" @@ -214,7 +216,9 @@ def test_execute(self, sftp_hook, mock_hook): [mock.call("main_dir/test_object.json", mock.ANY)] ) - mock_hook.return_value.load_file.assert_called_once_with(mock.ANY, CONTAINER_NAME, "test_object.json") + mock_hook.return_value.load_file.assert_called_once_with( + mock.ANY, CONTAINER_NAME, "test_object.json", overwrite=False + ) sftp_hook.return_value.delete_file.assert_not_called() @@ -248,6 +252,6 @@ def test_execute_moved_files(self, sftp_hook, mock_hook): ) mock_hook.return_value.load_file.assert_called_once_with( - mock.ANY, CONTAINER_NAME, BLOB_PREFIX + "test_object.json" + mock.ANY, CONTAINER_NAME, BLOB_PREFIX + "test_object.json", overwrite=False ) assert sftp_hook.return_value.delete_file.called is True, "File must be moved" From 8c1e860de4c3c186ee8d0f206a12229743c7a495 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilherme=20Da=20Silva=20Gon=C3=A7alves?= Date: Sat, 11 Dec 2021 02:25:39 -0300 Subject: [PATCH 11/23] Update tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com> --- tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py index 87935f9050174..1c348353cae1e 100644 --- a/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py +++ b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py @@ -37,7 +37,6 @@ EXPECTED_FILES = [SOURCE_PATH_NO_WILDCARD + EXPECTED_BLOB_NAME] -# pylint: disable=unused-argument class TestSFTPToWasbOperator(unittest.TestCase): def test_init(self): operator = SFTPToWasbOperator( From ebd8ac8068698c12dec6b104d9897167f080a786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilherme=20Da=20Silva=20Gon=C3=A7alves?= Date: Sat, 11 Dec 2021 02:26:01 -0300 Subject: [PATCH 12/23] Update airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com> --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 46b13e85004f4..69f0d9b32a96a 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -169,7 +169,7 @@ def sftp_hook(self) -> SFTPHook: return SFTPHook(self.sftp_conn_id) def get_full_path_blob(self, file: str) -> str: - """Get a blob name based by the previous name and a blob_prefix variable""" + """Get a blob name based on the previous name and a blob_prefix variable""" return self.blob_prefix + os.path.basename(file) def copy_files_to_wasb(self, sftp_files: List[SftpFile]) -> List[str]: From 276a89712106d24ed97a74803ecdbfc0c98a4304 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilherme=20Da=20Silva=20Gon=C3=A7alves?= Date: Sat, 11 Dec 2021 02:26:08 -0300 Subject: [PATCH 13/23] Update airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com> --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 69f0d9b32a96a..ce1f8f69cd47e 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -160,7 +160,7 @@ def check_wildcards_limit(self) -> Any: @property def source_path_contains_wildcard(self) -> bool: - """Does source path contains a wildcard""" + """Checks if the SFTP source path contains a wildcard.""" return WILDCARD in self.sftp_source_path @cached_property From a2175e3264df95cefe3d91ba33f364562f151081 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilherme=20Da=20Silva=20Gon=C3=A7alves?= Date: Sat, 11 Dec 2021 02:26:14 -0300 Subject: [PATCH 14/23] Update airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com> --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index ce1f8f69cd47e..3f93f1276dfeb 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -150,7 +150,7 @@ def get_tree_behavior(self) -> Tuple[str, Optional[str], Optional[str]]: return self.sftp_source_path, None, None def check_wildcards_limit(self) -> Any: - """Check if there is multiple Wildcard.""" + """Check if there are multiple wildcards used in the SFTP source path.""" total_wildcards = self.sftp_source_path.count(WILDCARD) if total_wildcards > 1: raise AirflowException( From 224227f106411aed04e8c6f966de15219c44fb04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilherme=20Da=20Silva=20Gon=C3=A7alves?= Date: Sat, 11 Dec 2021 02:26:20 -0300 Subject: [PATCH 15/23] Update airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com> --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 3f93f1276dfeb..f322a202e986a 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -149,7 +149,7 @@ def get_tree_behavior(self) -> Tuple[str, Optional[str], Optional[str]]: return self.sftp_source_path, None, None - def check_wildcards_limit(self) -> Any: + def check_wildcards_limit(self) -> None: """Check if there are multiple wildcards used in the SFTP source path.""" total_wildcards = self.sftp_source_path.count(WILDCARD) if total_wildcards > 1: From f8545a0333d91b01832c3b890b5e74a3b5d60c6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guilherme=20Da=20Silva=20Gon=C3=A7alves?= Date: Sat, 11 Dec 2021 02:26:28 -0300 Subject: [PATCH 16/23] Update airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com> --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index f322a202e986a..580b3e9ea1a59 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -110,7 +110,7 @@ def dry_run(self) -> None: if self.move_object: self.log.info("Executing delete of %s", file) - def execute(self, context: Optional[Dict[Any, Any]]) -> None: + def execute(self, context: Dict) -> None: """Upload a file from SFTP to Azure Blob Storage.""" sftp_files: List[SftpFile] = self.get_sftp_files_map() uploaded_files = self.copy_files_to_wasb(sftp_files) From dde29b11a5713cb2f5559087b95612e268db8637 Mon Sep 17 00:00:00 2001 From: wolvery Date: Sat, 11 Dec 2021 02:29:33 -0300 Subject: [PATCH 17/23] Fix typing to Dict. --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 580b3e9ea1a59..fb3e8e3d178f1 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -19,7 +19,7 @@ import os from collections import namedtuple from tempfile import NamedTemporaryFile -from typing import Any, Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple try: from functools import cached_property @@ -81,7 +81,7 @@ def __init__( blob_prefix: str = "", sftp_conn_id: str = "sftp_default", wasb_conn_id: str = 'wasb_default', - load_options: Optional[dict] = None, + load_options: Optional[Dict] = None, move_object: bool = False, wasb_overwrite_object: bool = False, **kwargs, From fe9f493bf2fa65f164dd8149865327f6f6fe76c3 Mon Sep 17 00:00:00 2001 From: wolvery Date: Sat, 11 Dec 2021 02:31:19 -0300 Subject: [PATCH 18/23] Disables catchup to example of sftp to wasb --- .../microsoft/azure/example_dags/example_sftp_to_wasb.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py index a4229ce77b8d8..d70ca31f2875b 100644 --- a/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py @@ -43,6 +43,7 @@ def delete_sftp_file(): with DAG( "example_sftp_to_wasb", schedule_interval=None, + catchup=False, start_date=datetime(2021, 1, 1), # Override to match your needs ) as dag: transfer_files_to_sftp_step = SFTPOperator( From ed79e8ca520cf0e4fcf90b4ab7b3eca5808621ed Mon Sep 17 00:00:00 2001 From: wolvery Date: Mon, 13 Dec 2021 23:15:49 -0300 Subject: [PATCH 19/23] Fix cached property import --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index fb3e8e3d178f1..a014665fa8ddf 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -17,13 +17,14 @@ # under the License. """This module contains SFTP to Azure Blob Storage operator.""" import os +import sys from collections import namedtuple from tempfile import NamedTemporaryFile from typing import Dict, List, Optional, Tuple -try: +if sys.version_info >= (3, 8): from functools import cached_property -except ImportError: +else: from cached_property import cached_property from airflow.exceptions import AirflowException From e37ad5c3fb9a4ffe0fe853e02cb393eee479aa43 Mon Sep 17 00:00:00 2001 From: wolvery Date: Thu, 16 Dec 2021 23:36:39 -0300 Subject: [PATCH 20/23] Fix comment to delete_files method --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index a014665fa8ddf..04e7b118706ba 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -193,7 +193,7 @@ def copy_files_to_wasb(self, sftp_files: List[SftpFile]) -> List[str]: return uploaded_files def delete_files(self, uploaded_files: List[str]) -> None: - """Performs a move of a list of files at SFTP to Azure Blob Storage.""" + """Delete files at SFTP which have been moved to Azure Blob Storage.""" for sftp_file_path in uploaded_files: self.log.info("Executing delete of %s", sftp_file_path) self.sftp_hook.delete_file(sftp_file_path) From b5ee6e8b0944bbb0c3805a1c3eacbfd14f693eaf Mon Sep 17 00:00:00 2001 From: wolvery Date: Mon, 20 Dec 2021 01:14:21 -0300 Subject: [PATCH 21/23] Removes unnecessary documentation. --- .../operators/sftp_to_wasb.rst | 4 ---- 1 file changed, 4 deletions(-) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst index 486e2076db6c5..b628681f4ee89 100644 --- a/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst +++ b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst @@ -32,10 +32,6 @@ Please follow Azure `instructions `_ to do it. -Please, check `WASB `_ -and `SFTP `_ -for how to do such `connection `_. - See following example. Set values for these fields: From c8e87b067ba29a30a6574d077471131bb738c284 Mon Sep 17 00:00:00 2001 From: wolvery Date: Mon, 20 Dec 2021 01:16:21 -0300 Subject: [PATCH 22/23] Uses a better documentation --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index 04e7b118706ba..a014665fa8ddf 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -193,7 +193,7 @@ def copy_files_to_wasb(self, sftp_files: List[SftpFile]) -> List[str]: return uploaded_files def delete_files(self, uploaded_files: List[str]) -> None: - """Delete files at SFTP which have been moved to Azure Blob Storage.""" + """Performs a move of a list of files at SFTP to Azure Blob Storage.""" for sftp_file_path in uploaded_files: self.log.info("Executing delete of %s", sftp_file_path) self.sftp_hook.delete_file(sftp_file_path) From f1441c1483381f729e109691b1ddf588f01bb6f4 Mon Sep 17 00:00:00 2001 From: wolvery Date: Mon, 20 Dec 2021 01:17:00 -0300 Subject: [PATCH 23/23] Uses a better documentation --- airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py index a014665fa8ddf..04e7b118706ba 100644 --- a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -193,7 +193,7 @@ def copy_files_to_wasb(self, sftp_files: List[SftpFile]) -> List[str]: return uploaded_files def delete_files(self, uploaded_files: List[str]) -> None: - """Performs a move of a list of files at SFTP to Azure Blob Storage.""" + """Delete files at SFTP which have been moved to Azure Blob Storage.""" for sftp_file_path in uploaded_files: self.log.info("Executing delete of %s", sftp_file_path) self.sftp_hook.delete_file(sftp_file_path)