Skip to content

Commit

Permalink
Update Teradata Provider (#32)
Browse files Browse the repository at this point in the history
Added new features to Teradata Provider.

• Introduction of Stored Procedure Support in Teradata Hook
• Inclusion of the TeradataStoredProcedureOperator for executing stored procedures
• Integration of Azure Blob Storage to Teradata Transfer Operator
• Integration of Amazon S3 to Teradata Transfer Operator
• Provision of necessary documentation, along with unit and system tests, for the Teradata Provider modifications.
  • Loading branch information
satish-chinthanippu committed Apr 24, 2024
1 parent 6e51198 commit 6e6071e
Show file tree
Hide file tree
Showing 21 changed files with 1,189 additions and 16 deletions.
67 changes: 67 additions & 0 deletions airflow/providers/teradata/hooks/teradata.py
Expand Up @@ -32,6 +32,17 @@
if TYPE_CHECKING:
from airflow.models.connection import Connection

PARAM_TYPES = {bool, float, int, str}


def _map_param(value):
if value in PARAM_TYPES:
# In this branch, value is a Python type; calling it produces
# an instance of the type which is understood by the Teradata driver
# in the out parameter mapping mechanism.
value = value()
return value


class TeradataHook(DbApiHook):
"""General hook for interacting with Teradata SQL Database.
Expand Down Expand Up @@ -187,3 +198,59 @@ def get_ui_field_behaviour() -> dict:
"password": "dbc",
},
}

def callproc(
self,
identifier: str,
autocommit: bool = False,
parameters: list | dict | None = None,
) -> list | dict | tuple | None:
"""
Call the stored procedure identified by the provided string.
Any OUT parameters must be provided with a value of either the
expected Python type (e.g., `int`) or an instance of that type.
:param identifier: stored procedure name
:param autocommit: What to set the connection's autocommit setting to
before executing the query.
:param parameters: The `IN`, `OUT` and `INOUT` parameters for Teradata
stored procedure
The return value is a list or mapping that includes parameters in
both directions; the actual return type depends on the type of the
provided `parameters` argument.
"""
if parameters is None:
parameters = []

args = ",".join("?" for name in parameters)

sql = f"{{CALL {identifier}({(args)})}}"

def handler(cursor):
records = cursor.fetchall()

if records is None:
return

if isinstance(records, list):
return [row for row in records]

if isinstance(records, dict):
return {n: v for (n, v) in records.items()}
raise TypeError(f"Unexpected results: {records}")

result = self.run(
sql,
autocommit=autocommit,
parameters=(
{name: _map_param(value) for (name, value) in parameters.items()}
if isinstance(parameters, dict)
else [_map_param(value) for value in parameters]
),
handler=handler,
)

return result
43 changes: 41 additions & 2 deletions airflow/providers/teradata/operators/teradata.py
Expand Up @@ -17,11 +17,15 @@
# under the License.
from __future__ import annotations

from typing import Sequence
from typing import TYPE_CHECKING, Sequence

from airflow.models import BaseOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.teradata.hooks.teradata import TeradataHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class TeradataOperator(SQLExecuteQueryOperator):
"""
Expand All @@ -41,8 +45,8 @@ class TeradataOperator(SQLExecuteQueryOperator):
"""

template_fields: Sequence[str] = (
"parameters",
"sql",
"parameters",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"sql": "sql"}
Expand All @@ -62,3 +66,38 @@ def __init__(
}
super().__init__(**kwargs)
self.conn_id = conn_id


class TeradataStoredProcedureOperator(BaseOperator):
"""
Executes stored procedure in a specific Teradata database.
:param procedure: name of stored procedure to call (templated)
:param conn_id: The :ref:`Teradata connection id <howto/connection:teradata>`
reference to a specific Teradata database.
:param parameters: (optional, templated) the parameters provided in the call
"""

template_fields: Sequence[str] = (
"procedure",
"parameters",
)
ui_color = "#ededed"

def __init__(
self,
*,
procedure: str,
conn_id: str = TeradataHook.default_conn_name,
parameters: dict | list | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.conn_id = conn_id
self.procedure = procedure
self.parameters = parameters

def execute(self, context: Context):
hook = TeradataHook(teradata_conn_id=self.conn_id)
return hook.callproc(self.procedure, autocommit=True, parameters=self.parameters)
10 changes: 10 additions & 0 deletions airflow/providers/teradata/provider.yaml
Expand Up @@ -32,6 +32,8 @@ dependencies:
- apache-airflow-providers-common-sql>=1.3.1
- teradatasqlalchemy>=17.20.0.0
- teradatasql>=17.20.0.28
- apache-airflow-providers-microsoft-azure
- apache-airflow-providers-amazon

integrations:
- integration-name: Teradata
Expand All @@ -56,6 +58,14 @@ transfers:
target-integration-name: Teradata
python-module: airflow.providers.teradata.transfers.teradata_to_teradata
how-to-guide: /docs/apache-airflow-providers-teradata/operators/teradata_to_teradata.rst
- source-integration-name: Microsoft Azure Blob Storage
target-integration-name: Teradata
python-module: airflow.providers.teradata.transfers.azure_blob_to_teradata
how-to-guide: /docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst
- source-integration-name: Amazon Simple Storage Service (S3)
target-integration-name: Teradata
python-module: airflow.providers.teradata.transfers.s3_to_teradata
how-to-guide: /docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst

connection-types:
- hook-class-name: airflow.providers.teradata.hooks.teradata.TeradataHook
Expand Down
105 changes: 105 additions & 0 deletions airflow/providers/teradata/transfers/azure_blob_to_teradata.py
@@ -0,0 +1,105 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.providers.teradata.hooks.teradata import TeradataHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class AzureBlobStorageToTeradataOperator(BaseOperator):
"""
Loads CSV, JSON and Parquet format data from Azure Blob Storage to Teradata.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureBlobStorageToTeradataOperator`
:param blob_source_key: The URI format specifying the location of the Azure blob object store.(templated)
The URI format is `/az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION`.
Refer to
https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US
:param azure_conn_id: The Airflow WASB connection used for azure blob credentials.
:param teradata_table: The name of the teradata table to which the data is transferred.(templated)
:param teradata_conn_id: The connection ID used to connect to Teradata
:ref:`Teradata connection <howto/connection:Teradata>`
Note that ``blob_source_key`` and ``teradata_table`` are
templated, so you can use variables in them if you wish.
"""

template_fields: Sequence[str] = ("blob_source_key", "teradata_table")
ui_color = "#e07c24"

def __init__(
self,
*,
blob_source_key: str,
azure_conn_id: str = "azure_default",
teradata_table: str,
teradata_conn_id: str = "teradata_default",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.blob_source_key = blob_source_key
self.azure_conn_id = azure_conn_id
self.teradata_table = teradata_table
self.teradata_conn_id = teradata_conn_id

def execute(self, context: Context) -> None:
self.log.info(
"transferring data from %s to teradata table %s...", self.blob_source_key, self.teradata_table
)
azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id)
conn = azure_hook.get_connection(self.azure_conn_id)
# Obtaining the Azure client ID and Azure secret in order to access a specified Blob container
access_id = conn.login if conn.login is not None else ""
access_secret = conn.password if conn.password is not None else ""
teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
sql = f"""
CREATE MULTISET TABLE {self.teradata_table} AS
(
SELECT * FROM (
LOCATION = '{self.blob_source_key}'
ACCESS_ID= '{access_id}'
ACCESS_KEY= '{access_secret}'
) AS d
) WITH DATA
"""
try:
teradata_hook.run(sql, True)
except Exception as ex:
# Handling permission issue errors
if "Error 3524" in str(ex):
self.log.error("The user does not have CREATE TABLE access in teradata")
raise
if "Error 9134" in str(ex):
self.log.error(
"There is an issue with the transfer operation. Please validate azure and "
"teradata connection details."
)
raise
self.log.error("Issue occurred at Teradata: %s", str(ex))
raise
self.log.info("The transfer of data from Azure Blob to Teradata was successful")
110 changes: 110 additions & 0 deletions airflow/providers/teradata/transfers/s3_to_teradata.py
@@ -0,0 +1,110 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.teradata.hooks.teradata import TeradataHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class S3ToTeradataOperator(BaseOperator):
"""
Loads CSV, JSON and Parquet format data from Amazon S3 to Teradata.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:S3ToTeradataOperator`
:param s3_source_key: The URI format specifying the location of the S3 object store.(templated)
The URI format is /s3/YOUR-BUCKET.s3.amazonaws.com/YOUR-BUCKET-NAME.
Refer to
https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US
:param teradata_table: The name of the teradata table to which the data is transferred.(templated)
:param aws_conn_id: The Airflow AWS connection used for AWS credentials.
:param teradata_conn_id: The connection ID used to connect to Teradata
:ref:`Teradata connection <howto/connection:Teradata>`.
Note that ``s3_source_key`` and ``teradata_table`` are
templated, so you can use variables in them if you wish.
"""

template_fields: Sequence[str] = ("s3_source_key", "teradata_table")
ui_color = "#e07c24"

def __init__(
self,
*,
s3_source_key: str,
teradata_table: str,
aws_conn_id: str = "aws_default",
teradata_conn_id: str = "teradata_default",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.s3_source_key = s3_source_key
self.teradata_table = teradata_table
self.aws_conn_id = aws_conn_id
self.teradata_conn_id = teradata_conn_id

def execute(self, context: Context) -> None:
self.log.info(
"transferring data from %s to teradata table %s...", self.s3_source_key, self.teradata_table
)

s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
access_key = (
s3_hook.conn_config.aws_access_key_id if s3_hook.conn_config.aws_access_key_id is not None else ""
)
access_secret = (
s3_hook.conn_config.aws_secret_access_key
if s3_hook.conn_config.aws_secret_access_key is not None
else ""
)

teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
sql = f"""
CREATE MULTISET TABLE {self.teradata_table} AS
(
SELECT * FROM (
LOCATION = '{self.s3_source_key}'
ACCESS_ID= '{access_key}'
ACCESS_KEY= '{access_secret}'
) AS d
) WITH DATA
"""
try:
teradata_hook.run(sql, True)
except Exception as ex:
# Handling permission issue errors
if "Error 3524" in str(ex):
self.log.error("The user does not have CREATE TABLE access in teradata")
raise
if "Error 9134" in str(ex):
self.log.error(
"There is an issue with the transfer operation. Please validate s3 and "
"teradata connection details."
)
raise
self.log.error("Issue occurred at Teradata: %s", str(ex))
raise
self.log.info("The transfer of data from S3 to Teradata was successful")

0 comments on commit 6e6071e

Please sign in to comment.