New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updates to Teradata Provider #39217
base: main
Are you sure you want to change the base?
Updates to Teradata Provider #39217
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice set of updates !
SELECT * FROM ( | ||
LOCATION = '{self.blob_source_key}' | ||
ACCESS_ID= '{access_id}' | ||
ACCESS_KEY= '{access_secret}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this expose the secret in the logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Handling permission issue errors | ||
if "Error 3524" in str(ex): | ||
self.log.error("The user does not have CREATE TABLE access in teradata") | ||
raise | ||
if "Error 9134" in str(ex): | ||
self.log.error( | ||
"There is an issue with the transfer operation. Please validate azure and " | ||
"teradata connection details." | ||
) | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious. How are the original error message look like?
Does these messages added more value on top of the original error message?
i would expect the service to return all the needed information in the exception itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed and removed additional checking of exceptions instead added log error for exception received. Teradata returning needed exception information.
Implemented enhancements to the Teradata Provider for Teradata. This release incorporates the following features: • Introduction of Stored Procedure Support in Teradata Hook • Inclusion of the TeradataStoredProcedureOperator for executing stored procedures • Integration of Azure Blob Storage to Teradata Transfer Operator • Integration of Amazon S3 to Teradata Transfer Operator • Provision of necessary documentation, along with unit and system tests, for the Teradata Provider modifications.
Reopening due to accidental closure. |
- apache-airflow-providers-microsoft-azure | ||
- apache-airflow-providers-amazon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting this means that anyone who is using teradata provider will be forced to also have azure and amazon.
I don't think it's right as these integrations are not part of the core functionality that this provider offer (unlike for example common-sql) I suggest to move them to optional dependencies like:
airflow/airflow/providers/google/provider.yaml
Lines 167 to 185 in 1074b8e
additional-extras: | |
- name: apache.beam | |
dependencies: | |
- apache-beam[gcp] | |
- name: cncf.kubernetes | |
dependencies: | |
- apache-airflow-providers-cncf-kubernetes>=7.2.0 | |
- name: leveldb | |
dependencies: | |
- plyvel | |
- name: oracle | |
dependencies: | |
- apache-airflow-providers-oracle>=3.1.0 | |
- name: facebook | |
dependencies: | |
- apache-airflow-providers-facebook>=2.2.0 | |
- name: amazon | |
dependencies: | |
- apache-airflow-providers-amazon>=2.6.0 |
This will allow users of teradata to choose if they want to add the optional dependencies into their installation.
You can also wrap the imports with AirflowOptionalProviderFeatureException to prevent cases where someone uses operator without having the underlying provider installed (similar to what I suggested in #39366 (comment) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eladkal thank you. Will change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified as suggested
def _map_param(value): | ||
if value in PARAM_TYPES: | ||
# In this branch, value is a Python type; calling it produces | ||
# an instance of the type which is understood by the Teradata driver | ||
# in the out parameter mapping mechanism. | ||
value = value() | ||
return value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure that i understand this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is used to translate stored procedure out parameters into a format understandable by the driver. For instance, str
will be converted to an empty string (''). Stored procedures can be invoked with output parameters in various ways, as illustrated below.
TeradataStoredProcedureOperator(
task_id="opr_sp_types",
procedure="TEST_PROCEDURE",
parameters=[3, 1, int, str],
)
This will result in the statement: {CALL TEST_PROCEDURE(?,?,?,?)}, with parameters: [3, 1, 0, '']
.
If we omit the usage of this function, the statement would be converted to {CALL TEST_PROCEDURE(?,?,?,?)}, with parameters: [3, 1, <class 'int'>, <class 'str'>],
which leads to failure with an error.
Similarly, consider another invocation of the TeradataStoredProcedureOperator
:
TeradataStoredProcedureOperator(
task_id="opr_sp_place_holder",
procedure="TEST_PROCEDURE",
parameters=[3, 1, "?", "?"],
)
This will translate to the statement: {CALL TEST_PROCEDURE(?,?,?,?)}, with parameters: [3, 1, ?, ?].
sql = f""" | ||
CREATE MULTISET TABLE {self.teradata_table} AS | ||
( | ||
SELECT * FROM ( | ||
LOCATION = '{self.s3_source_key}' | ||
ACCESS_ID= '{access_key}' | ||
ACCESS_KEY= '{access_secret}' | ||
) AS d | ||
) WITH DATA | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend import from textwrap import dedent
and remove redundant whitespaces
sql = f""" | |
CREATE MULTISET TABLE {self.teradata_table} AS | |
( | |
SELECT * FROM ( | |
LOCATION = '{self.s3_source_key}' | |
ACCESS_ID= '{access_key}' | |
ACCESS_KEY= '{access_secret}' | |
) AS d | |
) WITH DATA | |
""" | |
sql = dedent(f""" | |
CREATE MULTISET TABLE {self.teradata_table} AS | |
( | |
SELECT * FROM ( | |
LOCATION = '{self.s3_source_key}' | |
ACCESS_ID = '{access_key}' | |
ACCESS_KEY = '{access_secret}' | |
) AS d | |
) WITH DATA | |
""").rstrip() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented the recommendation.
sql = f""" | ||
CREATE MULTISET TABLE {self.teradata_table} AS | ||
( | ||
SELECT * FROM ( | ||
LOCATION = '{self.blob_source_key}' | ||
ACCESS_ID= '{access_id}' | ||
ACCESS_KEY= '{access_secret}' | ||
) AS d | ||
) WITH DATA | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql = f""" | |
CREATE MULTISET TABLE {self.teradata_table} AS | |
( | |
SELECT * FROM ( | |
LOCATION = '{self.blob_source_key}' | |
ACCESS_ID= '{access_id}' | |
ACCESS_KEY= '{access_secret}' | |
) AS d | |
) WITH DATA | |
""" | |
sql = dedent(f""" | |
CREATE MULTISET TABLE {self.teradata_table} AS | |
( | |
SELECT * FROM ( | |
LOCATION = '{self.blob_source_key}' | |
ACCESS_ID = '{access_id}' | |
ACCESS_KEY = '{access_secret}' | |
) AS d | |
) WITH DATA | |
""").rstrip() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented the recommendation.
access_key = ( | ||
s3_hook.conn_config.aws_access_key_id if s3_hook.conn_config.aws_access_key_id is not None else "" | ||
) | ||
access_secret = ( | ||
s3_hook.conn_config.aws_secret_access_key | ||
if s3_hook.conn_config.aws_secret_access_key is not None | ||
else "" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is not semantically correct in case of AWS Connection
Contain itself might not contain aws_access_key_id
/ aws_secret_access_key
it could be obtained by botocore
/ boto3
credential strategy
You should call get_credentials
method which returns named tuple with frozen credentials
ReadOnlyCredentials = namedtuple(
'ReadOnlyCredentials', ['access_key', 'secret_key', 'token']
)
Please note in case if token
is not None you should also provide it because without it (STS Session Token) credentials is not valid.
And I can't find how to do it within the terradata, because manual doesn't contain such information https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Authentication-for-External-Object-Stores/Using-AWS-Assume-Role/Setting-Up-Assume-Role-on-Analytics-Database however this KB shows that somehow it supported
And finally there is pretty difficult to handle anonymous access, because there is no out-of-box solution for that in AWS Hooks, so I would recommend to add separate parameter for that, so we could skip obtain connection at all if this kind of access required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Utilizing the get_credentials function to fetch the access key and secret from the AWS connection.
- Introduced a new parameter, "public_bucket," to indicate whether the bucket is publicly accessible or not. Accessing get_credentials only when bucket is not public.
- Concerning the STS session token, we had a JIRA task pending to integrate Teradata authorization object as a parameter for cloud transfer operators. This implementation will be integrated concurrently with the resolution of this JIRA task.
sql=""" | ||
SELECT count(1) from example_blob_teradata_csv; | ||
""", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is couple places where triple-quoted string literals redundant not required
sql=""" | |
SELECT count(1) from example_blob_teradata_csv; | |
""", | |
sql="SELECT count(1) from example_blob_teradata_csv;", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed suggestion in all applicable places.
sql=""" | ||
DROP TABLE example_blob_teradata_csv; | ||
""", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql=""" | |
DROP TABLE example_blob_teradata_csv; | |
""", | |
sql="DROP TABLE example_blob_teradata_csv;", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed suggestion in all applicable places.
sql=""" | ||
DROP TABLE example_s3_teradata_csv; | ||
""", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql=""" | |
DROP TABLE example_s3_teradata_csv; | |
""", | |
sql="DROP TABLE example_s3_teradata_csv;", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed suggestion in all applicable places.
sql=""" | ||
SELECT * from example_s3_teradata_json; | ||
""", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql=""" | |
SELECT * from example_s3_teradata_json; | |
""", | |
sql="SELECT * from example_s3_teradata_json;", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed suggestion in all applicable places.
sql=""" | ||
DROP TABLE example_s3_teradata_json; | ||
""", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql=""" | |
DROP TABLE example_s3_teradata_json; | |
""", | |
sql="DROP TABLE example_s3_teradata_json;", |
sql=""" | ||
SELECT * from example_s3_teradata_parquet; | ||
""", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql=""" | |
SELECT * from example_s3_teradata_parquet; | |
""", | |
sql="SELECT * from example_s3_teradata_parquet;", |
sql=""" | ||
DROP TABLE example_s3_teradata_parquet; | ||
""", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql=""" | |
DROP TABLE example_s3_teradata_parquet; | |
""", | |
sql="DROP TABLE example_s3_teradata_parquet;", |
Addressed following PR review comments Azure and Amazon optional dependency in teradata provider.yaml Added another parameter to s3 transfer operator to specify given bucket is public or not Added more examples of stored procedure operator in stored procedure system DAG Changed """ to " for single line SQL statements in system tests DAGs Applied dedent for SQL statements in cloud transfer operators
Implemented enhancements to the Teradata Provider for Teradata.
This release incorporates the following features:
• Introduction of Stored Procedure Support in Teradata Hook
• Inclusion of the TeradataStoredProcedureOperator for executing stored procedures
• Integration of Azure Blob Storage to Teradata Transfer Operator
• Integration of Amazon S3 to Teradata Transfer Operator
• Provision of necessary documentation, along with unit and system tests, for the Teradata Provider modifications.
Teradata Provider System Health Dashboard: https://teradata.github.io/airflow/
Teradata Provider documentation build status: https://github.com/Teradata/airflow/actions/workflows/ci-teradata-documentation.yml
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.