diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 3ebb0fc9e9965..bb844eb86dd64 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -683,7 +683,7 @@ dingding http discord http google amazon,apache.beam,apache.cassandra,cncf.kubernetes,facebook,microsoft.azure,microsoft.mssql,mysql,oracle,postgres,presto,salesforce,sftp,ssh,trino hashicorp google -microsoft.azure google,oracle +microsoft.azure google,oracle,sftp mysql amazon,presto,trino,vertica postgres amazon salesforce tableau diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json index ec082c1bb537b..3c511465a7c43 100644 --- a/airflow/providers/dependencies.json +++ b/airflow/providers/dependencies.json @@ -59,7 +59,8 @@ ], "microsoft.azure": [ "google", - "oracle" + "oracle", + "sftp" ], "mysql": [ "amazon", diff --git a/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py new file mode 100644 index 0000000000000..d70ca31f2875b --- /dev/null +++ b/airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os +from datetime import datetime + +from airflow import DAG +from airflow.decorators import task +from airflow.providers.microsoft.azure.operators.wasb_delete_blob import WasbDeleteBlobOperator +from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SFTPToWasbOperator +from airflow.providers.sftp.hooks.sftp import SFTPHook +from airflow.providers.sftp.operators.sftp import SFTPOperator + +AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow") +BLOB_PREFIX = os.environ.get("AZURE_BLOB_PREFIX", "airflow") +SFTP_SRC_PATH = os.environ.get("SFTP_SRC_PATH", "/sftp") +LOCAL_FILE_PATH = os.environ.get("LOCAL_SRC_PATH", "/tmp") +SAMPLE_FILENAME = os.environ.get("SFTP_SAMPLE_FILENAME", "sftp_to_wasb_test.txt") +FILE_COMPLETE_PATH = os.path.join(LOCAL_FILE_PATH, SAMPLE_FILENAME) +SFTP_FILE_COMPLETE_PATH = os.path.join(SFTP_SRC_PATH, SAMPLE_FILENAME) + + +@task +def delete_sftp_file(): + """Delete a file at SFTP SERVER""" + SFTPHook().delete_file(SFTP_FILE_COMPLETE_PATH) + + +with DAG( + "example_sftp_to_wasb", + schedule_interval=None, + catchup=False, + start_date=datetime(2021, 1, 1), # Override to match your needs +) as dag: + transfer_files_to_sftp_step = SFTPOperator( + task_id="transfer_files_from_local_to_sftp", + local_filepath=FILE_COMPLETE_PATH, + remote_filepath=SFTP_FILE_COMPLETE_PATH, + ) + + # [START how_to_sftp_to_wasb] + transfer_files_to_azure = SFTPToWasbOperator( + task_id="transfer_files_from_sftp_to_wasb", + # SFTP args + sftp_source_path=SFTP_SRC_PATH, + # AZURE args + container_name=AZURE_CONTAINER_NAME, + blob_prefix=BLOB_PREFIX, + ) + # [END how_to_sftp_to_wasb] + + delete_blob_file_step = WasbDeleteBlobOperator( + task_id="delete_blob_files", + container_name=AZURE_CONTAINER_NAME, + blob_name=BLOB_PREFIX + SAMPLE_FILENAME, + ) + + transfer_files_to_sftp_step >> transfer_files_to_azure >> delete_blob_file_step >> delete_sftp_file() diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index 8c67bbf3f5ed9..97e98909b62bc 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -179,6 +179,10 @@ transfers: target-integration-name: Google Cloud Storage (GCS) how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/azure_blob_to_gcs.rst python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs + - source-integration-name: SSH File Transfer Protocol (SFTP) + target-integration-name: Microsoft Azure Blob Storage + how-to-guide: /docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst + python-module: airflow.providers.microsoft.azure.transfers.sftp_to_wasb hook-class-names: # deprecated - to be removed after providers add dependency on Airflow 2.2.0+ - airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook diff --git a/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py new file mode 100644 index 0000000000000..04e7b118706ba --- /dev/null +++ b/airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py @@ -0,0 +1,199 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains SFTP to Azure Blob Storage operator.""" +import os +import sys +from collections import namedtuple +from tempfile import NamedTemporaryFile +from typing import Dict, List, Optional, Tuple + +if sys.version_info >= (3, 8): + from functools import cached_property +else: + from cached_property import cached_property + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.microsoft.azure.hooks.wasb import WasbHook +from airflow.providers.sftp.hooks.sftp import SFTPHook + +WILDCARD = "*" +SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name') + + +class SFTPToWasbOperator(BaseOperator): + """ + Transfer files to Azure Blob Storage from SFTP server. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:SFTPToWasbOperator` + + :param sftp_source_path: The sftp remote path. This is the specified file path + for downloading the single file or multiple files from the SFTP server. + You can use only one wildcard within your path. The wildcard can appear + inside the path or at the end of the path. + :type sftp_source_path: str + :param container_name: Name of the container. + :type container_name: str + :param blob_prefix: Prefix to name a blob. + :type blob_prefix: str + :param sftp_conn_id: The sftp connection id. The name or identifier for + establishing a connection to the SFTP server. + :type sftp_conn_id: str + :param wasb_conn_id: Reference to the wasb connection. + :type wasb_conn_id: str + :param load_options: Optional keyword arguments that + ``WasbHook.load_file()`` takes. + :type load_options: dict + :param move_object: When move object is True, the object is moved instead + of copied to the new location. This is the equivalent of a mv command + as opposed to a cp command. + :param wasb_overwrite_object: Whether the blob to be uploaded + should overwrite the current data. + When wasb_overwrite_object is True, it will overwrite the existing data. + If set to False, the operation might fail with + ResourceExistsError in case a blob object already exists. + :type move_object: bool + """ + + template_fields = ("sftp_source_path", "container_name", "blob_prefix") + + def __init__( + self, + *, + sftp_source_path: str, + container_name: str, + blob_prefix: str = "", + sftp_conn_id: str = "sftp_default", + wasb_conn_id: str = 'wasb_default', + load_options: Optional[Dict] = None, + move_object: bool = False, + wasb_overwrite_object: bool = False, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.sftp_source_path = sftp_source_path + self.blob_prefix = blob_prefix + self.sftp_conn_id = sftp_conn_id + self.wasb_conn_id = wasb_conn_id + self.container_name = container_name + self.wasb_conn_id = wasb_conn_id + self.load_options = load_options or {"overwrite": wasb_overwrite_object} + self.move_object = move_object + + def dry_run(self) -> None: + super().dry_run() + sftp_files: List[SftpFile] = self.get_sftp_files_map() + for file in sftp_files: + self.log.info( + 'Process will upload file from (SFTP) %s to wasb://%s as %s', + file.sftp_file_path, + self.container_name, + file.blob_name, + ) + if self.move_object: + self.log.info("Executing delete of %s", file) + + def execute(self, context: Dict) -> None: + """Upload a file from SFTP to Azure Blob Storage.""" + sftp_files: List[SftpFile] = self.get_sftp_files_map() + uploaded_files = self.copy_files_to_wasb(sftp_files) + if self.move_object: + self.delete_files(uploaded_files) + + def get_sftp_files_map(self) -> List[SftpFile]: + """Get SFTP files from the source path, it may use a WILDCARD to this end.""" + sftp_files = [] + + sftp_complete_path, prefix, delimiter = self.get_tree_behavior() + + found_files, _, _ = self.sftp_hook.get_tree_map( + sftp_complete_path, prefix=prefix, delimiter=delimiter + ) + + self.log.info("Found %s files at sftp source path: %s", str(len(found_files)), self.sftp_source_path) + + for file in found_files: + future_blob_name = self.get_full_path_blob(file) + sftp_files.append(SftpFile(file, future_blob_name)) + + return sftp_files + + def get_tree_behavior(self) -> Tuple[str, Optional[str], Optional[str]]: + """Extracts from source path the tree behavior to interact with the remote folder""" + self.check_wildcards_limit() + + if self.source_path_contains_wildcard: + + prefix, delimiter = self.sftp_source_path.split(WILDCARD, 1) + + sftp_complete_path = os.path.dirname(prefix) + + return sftp_complete_path, prefix, delimiter + + return self.sftp_source_path, None, None + + def check_wildcards_limit(self) -> None: + """Check if there are multiple wildcards used in the SFTP source path.""" + total_wildcards = self.sftp_source_path.count(WILDCARD) + if total_wildcards > 1: + raise AirflowException( + "Only one wildcard '*' is allowed in sftp_source_path parameter. " + f"Found {total_wildcards} in {self.sftp_source_path}." + ) + + @property + def source_path_contains_wildcard(self) -> bool: + """Checks if the SFTP source path contains a wildcard.""" + return WILDCARD in self.sftp_source_path + + @cached_property + def sftp_hook(self) -> SFTPHook: + """Property of sftp hook to be re-used.""" + return SFTPHook(self.sftp_conn_id) + + def get_full_path_blob(self, file: str) -> str: + """Get a blob name based on the previous name and a blob_prefix variable""" + return self.blob_prefix + os.path.basename(file) + + def copy_files_to_wasb(self, sftp_files: List[SftpFile]) -> List[str]: + """Upload a list of files from sftp_files to Azure Blob Storage with a new Blob Name.""" + uploaded_files = [] + wasb_hook = WasbHook(wasb_conn_id=self.wasb_conn_id) + for file in sftp_files: + with NamedTemporaryFile("w") as tmp: + self.sftp_hook.retrieve_file(file.sftp_file_path, tmp.name) + self.log.info( + 'Uploading %s to wasb://%s as %s', + file.sftp_file_path, + self.container_name, + file.blob_name, + ) + wasb_hook.load_file(tmp.name, self.container_name, file.blob_name, **self.load_options) + + uploaded_files.append(file.sftp_file_path) + + return uploaded_files + + def delete_files(self, uploaded_files: List[str]) -> None: + """Delete files at SFTP which have been moved to Azure Blob Storage.""" + for sftp_file_path in uploaded_files: + self.log.info("Executing delete of %s", sftp_file_path) + self.sftp_hook.delete_file(sftp_file_path) diff --git a/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst new file mode 100644 index 0000000000000..b628681f4ee89 --- /dev/null +++ b/docs/apache-airflow-providers-microsoft-azure/operators/sftp_to_wasb.rst @@ -0,0 +1,61 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +Azure Blob Storage Transfer Operator +==================================== +The Blob service stores text and binary data as objects in the cloud. +The Blob service offers the following three resources: the storage account, containers, and blobs. +Within your storage account, containers provide a way to organize sets of blobs. +For more information about the service visit `Azure Blob Storage API documentation `_. + +Before you begin +^^^^^^^^^^^^^^^^ +Before using Blob Storage within Airflow you need to authenticate your account with Token, Login and Password. +Please follow Azure +`instructions `_ +to do it. + +See following example. +Set values for these fields: + +.. code-block:: + + SFTP Conn Id: sftp_default + WASB Conn Id: wasb_default + +.. contents:: + :depth: 1 + :local: + +.. _howto/operator:SFTPToWasbOperator: + +Transfer Data from SFTP Source Path to Blob Storage +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Operator transfers data from SFTP Source Path to specified container in Azure Blob Storage + +To get information about jobs within a Azure Blob Storage use: +:class:`~airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPToWasbOperator` +Example usage: + +.. exampleinclude:: /../../airflow/providers/microsoft/azure/example_dags/example_sftp_to_wasb.py + :language: python + :dedent: 4 + :start-after: [START how_to_sftp_to_wasb] + :end-before: [END how_to_sftp_to_wasb] diff --git a/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py new file mode 100644 index 0000000000000..1c348353cae1e --- /dev/null +++ b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb.py @@ -0,0 +1,256 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +from unittest import mock + +from airflow import AirflowException +from airflow.providers.microsoft.azure.transfers.sftp_to_wasb import SftpFile, SFTPToWasbOperator + +TASK_ID = "test-gcs-to-sftp-operator" +WASB_CONN_ID = "wasb_default" +SFTP_CONN_ID = "ssh_default" + +CONTAINER_NAME = "test-container" +WILDCARD_PATH = "main_dir/*" +WILDCARD_FILE_NAME = "main_dir/test_object*.json" +SOURCE_PATH_NO_WILDCARD = "main_dir/" +SOURCE_OBJECT_MULTIPLE_WILDCARDS = "main_dir/csv/*/test_*.csv" +BLOB_PREFIX = "sponge-bob" +EXPECTED_BLOB_NAME = "test_object3.json" +EXPECTED_FILES = [SOURCE_PATH_NO_WILDCARD + EXPECTED_BLOB_NAME] + + +class TestSFTPToWasbOperator(unittest.TestCase): + def test_init(self): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + blob_prefix=BLOB_PREFIX, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + assert operator.sftp_source_path == SOURCE_PATH_NO_WILDCARD + assert operator.sftp_conn_id == SFTP_CONN_ID + assert operator.container_name == CONTAINER_NAME + assert operator.wasb_conn_id == WASB_CONN_ID + assert operator.blob_prefix == BLOB_PREFIX + + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook', autospec=True) + def test_execute_more_than_one_wildcard_exception(self, mock_hook): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_OBJECT_MULTIPLE_WILDCARDS, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + blob_prefix=BLOB_PREFIX, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + with self.assertRaises(AirflowException) as cm: + operator.check_wildcards_limit() + + err = cm.exception + assert "Only one wildcard '*' is allowed" in str(err) + + def test_get_sftp_tree_behavior(self): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=WILDCARD_PATH, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + sftp_complete_path, prefix, delimiter = operator.get_tree_behavior() + + assert sftp_complete_path == 'main_dir', "not matched at expected complete path" + assert prefix == 'main_dir/', "Prefix must be EQUAL TO wildcard" + assert delimiter == "", "Delimiter must be empty" + + def test_get_sftp_tree_behavior_without_wildcard(self): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + sftp_complete_path, prefix, delimiter = operator.get_tree_behavior() + + assert sftp_complete_path == 'main_dir/', "not matched at expected complete path" + assert prefix is None, "Prefix must be NONE when no wildcard" + assert delimiter is None, "Delimiter must be none" + + def test_source_path_contains_wildcard(self): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=WILDCARD_PATH, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + output = operator.source_path_contains_wildcard + assert output is True, "This path contains a wildpath" + + def test_source_path_not_contains_wildcard(self): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + output = operator.source_path_contains_wildcard + assert output is False, "This path does not contains a wildpath" + + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook') + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook') + def test_get_sftp_files_map_no_wildcard(self, sftp_hook, mock_hook): + sftp_hook.return_value.get_tree_map.return_value = [ + EXPECTED_FILES, + [], + [], + ] + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=True, + ) + files = operator.get_sftp_files_map() + + assert len(files) == 1, "no matched at expected found files" + assert files[0].blob_name == EXPECTED_BLOB_NAME, "expected blob name not matched" + + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook') + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook') + def test_copy_files_to_wasb(self, sftp_hook, mock_hook): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=True, + ) + + sftp_files = [SftpFile(EXPECTED_FILES[0], EXPECTED_BLOB_NAME)] + files = operator.copy_files_to_wasb(sftp_files) + + operator.sftp_hook.retrieve_file.assert_has_calls([mock.call("main_dir/test_object3.json", mock.ANY)]) + + mock_hook.return_value.load_file.assert_called_once_with( + mock.ANY, CONTAINER_NAME, EXPECTED_BLOB_NAME, overwrite=False + ) + + assert len(files) == 1, "no matched at expected uploaded files" + + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook') + def test_delete_files(self, sftp_hook): + sftp_mock = sftp_hook.return_value + + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=SOURCE_PATH_NO_WILDCARD, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=True, + ) + + sftp_file_paths = EXPECTED_FILES + operator.delete_files(sftp_file_paths) + + sftp_mock.delete_file.assert_has_calls([mock.call(EXPECTED_FILES[0])]) + + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook') + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook') + def test_execute(self, sftp_hook, mock_hook): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=WILDCARD_FILE_NAME, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=False, + ) + + sftp_hook.return_value.get_tree_map.return_value = [ + ["main_dir/test_object.json"], + [], + [], + ] + + operator.execute(None) + + sftp_hook.return_value.get_tree_map.assert_called_with( + "main_dir", prefix="main_dir/test_object", delimiter=".json" + ) + + sftp_hook.return_value.retrieve_file.assert_has_calls( + [mock.call("main_dir/test_object.json", mock.ANY)] + ) + + mock_hook.return_value.load_file.assert_called_once_with( + mock.ANY, CONTAINER_NAME, "test_object.json", overwrite=False + ) + + sftp_hook.return_value.delete_file.assert_not_called() + + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.WasbHook') + @mock.patch('airflow.providers.microsoft.azure.transfers.sftp_to_wasb.SFTPHook') + def test_execute_moved_files(self, sftp_hook, mock_hook): + operator = SFTPToWasbOperator( + task_id=TASK_ID, + sftp_source_path=WILDCARD_FILE_NAME, + sftp_conn_id=SFTP_CONN_ID, + container_name=CONTAINER_NAME, + wasb_conn_id=WASB_CONN_ID, + move_object=True, + blob_prefix=BLOB_PREFIX, + ) + + sftp_hook.return_value.get_tree_map.return_value = [ + ["main_dir/test_object.json"], + [], + [], + ] + + operator.execute(None) + + sftp_hook.return_value.get_tree_map.assert_called_with( + "main_dir", prefix="main_dir/test_object", delimiter=".json" + ) + + sftp_hook.return_value.retrieve_file.assert_has_calls( + [mock.call("main_dir/test_object.json", mock.ANY)] + ) + + mock_hook.return_value.load_file.assert_called_once_with( + mock.ANY, CONTAINER_NAME, BLOB_PREFIX + "test_object.json", overwrite=False + ) + assert sftp_hook.return_value.delete_file.called is True, "File must be moved" diff --git a/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py new file mode 100644 index 0000000000000..600f84fc33585 --- /dev/null +++ b/tests/providers/microsoft/azure/transfers/test_sftp_to_wasb_system.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import os + +import pytest + +from airflow.providers.microsoft.azure.example_dags.example_sftp_to_wasb import ( + FILE_COMPLETE_PATH, + LOCAL_FILE_PATH, + SAMPLE_FILENAME, +) +from tests.test_utils.azure_system_helpers import ( + AZURE_DAG_FOLDER, + AzureSystemTest, + provide_wasb_default_connection, +) +from tests.test_utils.sftp_system_helpers import provide_sftp_default_connection + +CREDENTIALS_DIR = os.environ.get('CREDENTIALS_DIR', '/files/airflow-breeze-config/keys') +SFTP_DEFAULT_KEY = 'sftp_key.json' +WASB_DEFAULT_KEY = 'wasb_key.json' +CREDENTIALS_SFTP_PATH = os.path.join(CREDENTIALS_DIR, SFTP_DEFAULT_KEY) +CREDENTIALS_WASB_PATH = os.path.join(CREDENTIALS_DIR, WASB_DEFAULT_KEY) + + +@pytest.mark.backend('postgres', 'mysql') +@pytest.mark.credential_file(WASB_DEFAULT_KEY) +@pytest.mark.credential_file(SFTP_DEFAULT_KEY) +class TestSFTPToWasbSystem(AzureSystemTest): + def setUp(self): + super().setUp() + self.create_dummy_file(SAMPLE_FILENAME, LOCAL_FILE_PATH) + + def tearDown(self): + os.remove(FILE_COMPLETE_PATH) + super().tearDown() + + @provide_wasb_default_connection(CREDENTIALS_WASB_PATH) + @provide_sftp_default_connection(CREDENTIALS_SFTP_PATH) + def test_run_example_file_to_wasb(self): + self.run_dag('example_sftp_to_wasb', AZURE_DAG_FOLDER) diff --git a/tests/test_utils/sftp_system_helpers.py b/tests/test_utils/sftp_system_helpers.py new file mode 100644 index 0000000000000..accf3c9504591 --- /dev/null +++ b/tests/test_utils/sftp_system_helpers.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +import os +from contextlib import contextmanager + +from airflow.exceptions import AirflowException +from airflow.models import Connection +from airflow.utils.process_utils import patch_environ + +SFTP_CONNECTION_ID = os.environ.get("SFTP_CONNECTION_ID", "sftp_default") + + +@contextmanager +def provide_sftp_default_connection(key_file_path: str): + """ + Context manager to provide a temporary value for sftp_default connection + + :param key_file_path: Path to file with sftp_default credentials .json file. + :type key_file_path: str + """ + if not key_file_path.endswith(".json"): + raise AirflowException("Use a JSON key file.") + with open(key_file_path) as credentials: + creds = json.load(credentials) + conn = Connection( + conn_id=SFTP_CONNECTION_ID, + conn_type="ssh", + port=creds.get("port", None), + host=creds.get("host", None), + login=creds.get("login", None), + password=creds.get("password", None), + extra=json.dumps(creds.get('extra', None)), + ) + with patch_environ({f"AIRFLOW_CONN_{conn.conn_id.upper()}": conn.get_uri()}): + yield