Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to Teradata Provider #39217

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
66 changes: 66 additions & 0 deletions airflow/providers/teradata/hooks/teradata.py
Expand Up @@ -32,6 +32,17 @@
if TYPE_CHECKING:
from airflow.models.connection import Connection

PARAM_TYPES = {bool, float, int, str}


def _map_param(value):
if value in PARAM_TYPES:
# In this branch, value is a Python type; calling it produces
# an instance of the type which is understood by the Teradata driver
# in the out parameter mapping mechanism.
value = value()
return value
Comment on lines +38 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that i understand this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is used to translate stored procedure out parameters into a format understandable by the driver. For instance, str will be converted to an empty string (''). Stored procedures can be invoked with output parameters in various ways, as illustrated below.

TeradataStoredProcedureOperator(
        task_id="opr_sp_types",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, int, str],
    )

This will result in the statement: {CALL TEST_PROCEDURE(?,?,?,?)}, with parameters: [3, 1, 0, ''].
If we omit the usage of this function, the statement would be converted to {CALL TEST_PROCEDURE(?,?,?,?)}, with parameters: [3, 1, <class 'int'>, <class 'str'>], which leads to failure with an error.

Similarly, consider another invocation of the TeradataStoredProcedureOperator:

TeradataStoredProcedureOperator(
        task_id="opr_sp_place_holder",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, "?", "?"],
 )

This will translate to the statement: {CALL TEST_PROCEDURE(?,?,?,?)}, with parameters: [3, 1, ?, ?].

Example DAG - https://github.com/apache/airflow/blob/1747e64f51f53a50a62ed31550be9ecf0c5e4ac7/tests/system/providers/teradata/example_teradata_call_sp.py



class TeradataHook(DbApiHook):
"""General hook for interacting with Teradata SQL Database.
Expand Down Expand Up @@ -187,3 +198,58 @@ def get_ui_field_behaviour() -> dict:
"password": "dbc",
},
}

def callproc(
self,
identifier: str,
autocommit: bool = False,
parameters: list | dict | None = None,
) -> list | dict | tuple | None:
"""
Call the stored procedure identified by the provided string.

Any OUT parameters must be provided with a value of either the
expected Python type (e.g., `int`) or an instance of that type.

:param identifier: stored procedure name
:param autocommit: What to set the connection's autocommit setting to
before executing the query.
:param parameters: The `IN`, `OUT` and `INOUT` parameters for Teradata
stored procedure

The return value is a list or mapping that includes parameters in
both directions; the actual return type depends on the type of the
provided `parameters` argument.

"""
if parameters is None:
parameters = []

args = ",".join("?" for name in parameters)

sql = f"{{CALL {identifier}({(args)})}}"

def handler(cursor):
records = cursor.fetchall()

if records is None:
return
if isinstance(records, list):
return [row for row in records]

if isinstance(records, dict):
return {n: v for (n, v) in records.items()}
raise TypeError(f"Unexpected results: {records}")

result = self.run(
sql,
autocommit=autocommit,
parameters=(
[_map_param(value) for (name, value) in parameters.items()]
if isinstance(parameters, dict)
else [_map_param(value) for value in parameters]
),
handler=handler,
)

return result
49 changes: 44 additions & 5 deletions airflow/providers/teradata/operators/teradata.py
Expand Up @@ -17,11 +17,15 @@
# under the License.
from __future__ import annotations

from typing import Sequence
from typing import TYPE_CHECKING, Sequence

from airflow.models import BaseOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.teradata.hooks.teradata import TeradataHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class TeradataOperator(SQLExecuteQueryOperator):
"""
Expand All @@ -34,23 +38,23 @@ class TeradataOperator(SQLExecuteQueryOperator):
:ref:`howto/operator:TeradataOperator`

:param sql: the SQL query to be executed as a single string, or a list of str (sql statements)
:param conn_id: reference to a predefined database
:param teradata_conn_id: reference to a predefined database
:param autocommit: if True, each command is automatically committed.(default value: False)
:param parameters: (optional) the parameters to render the SQL query with.
:param schema: The Teradata database to connect to.
"""

template_fields: Sequence[str] = (
"parameters",
"sql",
"parameters",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"sql": "sql"}
ui_color = "#e07c24"

def __init__(
self,
conn_id: str = TeradataHook.default_conn_name,
teradata_conn_id: str = TeradataHook.default_conn_name,
schema: str | None = None,
**kwargs,
) -> None:
Expand All @@ -61,4 +65,39 @@ def __init__(
**hook_params,
}
super().__init__(**kwargs)
self.conn_id = conn_id
self.conn_id = teradata_conn_id


class TeradataStoredProcedureOperator(BaseOperator):
"""
Executes stored procedure in a specific Teradata database.

:param procedure: name of stored procedure to call (templated)
:param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
reference to a specific Teradata database.
:param parameters: (optional, templated) the parameters provided in the call

"""

template_fields: Sequence[str] = (
"procedure",
"parameters",
)
ui_color = "#ededed"

def __init__(
self,
*,
procedure: str,
teradata_conn_id: str = TeradataHook.default_conn_name,
parameters: dict | list | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.teradata_conn_id = teradata_conn_id
self.procedure = procedure
self.parameters = parameters

def execute(self, context: Context):
hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
return hook.callproc(self.procedure, autocommit=True, parameters=self.parameters)
16 changes: 16 additions & 0 deletions airflow/providers/teradata/provider.yaml
Expand Up @@ -34,6 +34,14 @@ dependencies:
- teradatasqlalchemy>=17.20.0.0
- teradatasql>=17.20.0.28

additional-extras:
- name: microsoft.azure
dependencies:
- apache-airflow-providers-microsoft-azure
- name: amazon
dependencies:
- apache-airflow-providers-amazon

integrations:
- integration-name: Teradata
external-doc-url: https://www.teradata.com/
Expand All @@ -57,6 +65,14 @@ transfers:
target-integration-name: Teradata
python-module: airflow.providers.teradata.transfers.teradata_to_teradata
how-to-guide: /docs/apache-airflow-providers-teradata/operators/teradata_to_teradata.rst
- source-integration-name: Microsoft Azure Blob Storage
target-integration-name: Teradata
python-module: airflow.providers.teradata.transfers.azure_blob_to_teradata
how-to-guide: /docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst
- source-integration-name: Amazon Simple Storage Service (S3)
target-integration-name: Teradata
python-module: airflow.providers.teradata.transfers.s3_to_teradata
how-to-guide: /docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst

connection-types:
- hook-class-name: airflow.providers.teradata.hooks.teradata.TeradataHook
Expand Down
103 changes: 103 additions & 0 deletions airflow/providers/teradata/transfers/azure_blob_to_teradata.py
@@ -0,0 +1,103 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from textwrap import dedent
from typing import TYPE_CHECKING, Sequence

from airflow.models import BaseOperator

try:
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
except ModuleNotFoundError as e:
from airflow.exceptions import AirflowOptionalProviderFeatureException

raise AirflowOptionalProviderFeatureException(e)

from airflow.providers.teradata.hooks.teradata import TeradataHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class AzureBlobStorageToTeradataOperator(BaseOperator):
"""

Loads CSV, JSON and Parquet format data from Azure Blob Storage to Teradata.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureBlobStorageToTeradataOperator`

:param blob_source_key: The URI format specifying the location of the Azure blob object store.(templated)
The URI format is `/az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION`.
Refer to
https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US
:param azure_conn_id: The Airflow WASB connection used for azure blob credentials.
:param teradata_table: The name of the teradata table to which the data is transferred.(templated)
:param teradata_conn_id: The connection ID used to connect to Teradata
:ref:`Teradata connection <howto/connection:Teradata>`

Note that ``blob_source_key`` and ``teradata_table`` are
templated, so you can use variables in them if you wish.
"""

template_fields: Sequence[str] = ("blob_source_key", "teradata_table")
ui_color = "#e07c24"

def __init__(
self,
*,
blob_source_key: str,
azure_conn_id: str = "azure_default",
teradata_table: str,
teradata_conn_id: str = "teradata_default",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.blob_source_key = blob_source_key
self.azure_conn_id = azure_conn_id
self.teradata_table = teradata_table
self.teradata_conn_id = teradata_conn_id

def execute(self, context: Context) -> None:
self.log.info(
"transferring data from %s to teradata table %s...", self.blob_source_key, self.teradata_table
)
azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id)
conn = azure_hook.get_connection(self.azure_conn_id)
# Obtaining the Azure client ID and Azure secret in order to access a specified Blob container
access_id = conn.login if conn.login is not None else ""
access_secret = conn.password if conn.password is not None else ""
teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
sql = dedent(f"""
CREATE MULTISET TABLE {self.teradata_table} AS
(
SELECT * FROM (
LOCATION = '{self.blob_source_key}'
ACCESS_ID= '{access_id}'
ACCESS_KEY= '{access_secret}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this expose the secret in the logs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access_Id_Key_Log
secret is logging as masked in the log.

) AS d
) WITH DATA
""").rstrip()
try:
teradata_hook.run(sql, True)
except Exception as ex:
self.log.error(str(ex))
raise
self.log.info("The transfer of data from Azure Blob to Teradata was successful")