Skip to content

Commit

Permalink
Merge branch 'main' into AIRFLOW-26925
Browse files Browse the repository at this point in the history
  • Loading branch information
flolas committed Apr 27, 2024
2 parents c71e219 + 48c98bc commit 917d3b9
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 44 deletions.
6 changes: 4 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1250,8 +1250,9 @@ def _log_state(*, task_instance: TaskInstance | TaskInstancePydantic, lead_msg:
str(task_instance.state).upper(),
task_instance.dag_id,
task_instance.task_id,
task_instance.run_id,
]
message = "%sMarking task as %s. dag_id=%s, task_id=%s, "
message = "%sMarking task as %s. dag_id=%s, task_id=%s, run_id=%s, "
if task_instance.map_index >= 0:
params.append(task_instance.map_index)
message += "map_index=%d, "
Expand Down Expand Up @@ -2558,9 +2559,10 @@ def _run_raw_task(
raise
self.defer_task(defer=defer, session=session)
self.log.info(
"Pausing task as DEFERRED. dag_id=%s, task_id=%s, execution_date=%s, start_date=%s",
"Pausing task as DEFERRED. dag_id=%s, task_id=%s, run_id=%s, execution_date=%s, start_date=%s",
self.dag_id,
self.task_id,
self.run_id,
_date_or_empty(task_instance=self, attr="execution_date"),
_date_or_empty(task_instance=self, attr="start_date"),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,5 @@ def execute(self, context: Context) -> None:
rows=rows,
target_fields=self.selected_fields,
replace=self.replace,
commit_every=self.batch_size,
)
12 changes: 7 additions & 5 deletions airflow/providers/hashicorp/_internal_client/vault_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ class _VaultClient(LoggingMixin):
:param key_id: Key ID for Authentication (for ``aws_iam`` and ''azure`` auth_type).
:param secret_id: Secret ID for Authentication (for ``approle``, ``aws_iam`` and ``azure`` auth_types).
:param role_id: Role ID for Authentication (for ``approle``, ``aws_iam`` auth_types).
:param role_arn: AWS arn role (for ``aws_iam`` auth_type)
:param assume_role_kwargs: AWS assume role param.
See AWS STS Docs:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts/client/assume_role.html
:param kubernetes_role: Role for Authentication (for ``kubernetes`` auth_type).
:param kubernetes_jwt_path: Path for kubernetes jwt token (for ``kubernetes`` auth_type, default:
``/var/run/secrets/kubernetes.io/serviceaccount/token``).
Expand Down Expand Up @@ -104,7 +106,7 @@ def __init__(
password: str | None = None,
key_id: str | None = None,
secret_id: str | None = None,
role_arn: str | None = None,
assume_role_kwargs: dict | None = None,
role_id: str | None = None,
kubernetes_role: str | None = None,
kubernetes_jwt_path: str | None = "/var/run/secrets/kubernetes.io/serviceaccount/token",
Expand Down Expand Up @@ -163,7 +165,7 @@ def __init__(
self.key_id = key_id
self.secret_id = secret_id
self.role_id = role_id
self.role_arn = role_arn
self.assume_role_kwargs = assume_role_kwargs
self.kubernetes_role = kubernetes_role
self.kubernetes_jwt_path = kubernetes_jwt_path
self.gcp_key_path = gcp_key_path
Expand Down Expand Up @@ -330,9 +332,9 @@ def _auth_aws_iam(self, _client: hvac.Client) -> None:
else:
import boto3

if self.role_arn:
if self.assume_role_kwargs:
sts_client = boto3.client("sts")
credentials = sts_client.assume_role(RoleArn=self.role_arn, RoleSessionName="airflow")
credentials = sts_client.assume_role(**self.assume_role_kwargs)
auth_args = {
"access_key": credentials["Credentials"]["AccessKeyId"],
"secret_key": credentials["Credentials"]["SecretAccessKey"],
Expand Down
8 changes: 5 additions & 3 deletions airflow/providers/hashicorp/secrets/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
:param key_id: Key ID for Authentication (for ``aws_iam`` and ''azure`` auth_type).
:param secret_id: Secret ID for Authentication (for ``approle``, ``aws_iam`` and ``azure`` auth_types).
:param role_id: Role ID for Authentication (for ``approle``, ``aws_iam`` auth_types).
:param role_arn: AWS arn role,
:param assume_role_kwargs: AWS assume role param.
See AWS STS Docs:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts/client/assume_role.html
:param kubernetes_role: Role for Authentication (for ``kubernetes`` auth_type).
:param kubernetes_jwt_path: Path for kubernetes jwt token (for ``kubernetes`` auth_type, default:
``/var/run/secrets/kubernetes.io/serviceaccount/token``).
Expand Down Expand Up @@ -108,7 +110,7 @@ def __init__(
key_id: str | None = None,
secret_id: str | None = None,
role_id: str | None = None,
role_arn: str | None = None,
assume_role_kwargs: dict | None = None,
kubernetes_role: str | None = None,
kubernetes_jwt_path: str = "/var/run/secrets/kubernetes.io/serviceaccount/token",
gcp_key_path: str | None = None,
Expand Down Expand Up @@ -149,7 +151,7 @@ def __init__(
key_id=key_id,
secret_id=secret_id,
role_id=role_id,
role_arn=role_arn,
assume_role_kwargs=assume_role_kwargs,
kubernetes_role=kubernetes_role,
kubernetes_jwt_path=kubernetes_jwt_path,
gcp_key_path=gcp_key_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,15 @@ Add "verify": "absolute path to ca-certificate file"
Vault authentication with AWS Assume Role STS
"""""""""""""""""""""""""""""""""""""""""""""

Add parameter "role_arn": "The AWS ARN of the role to assume"
Add parameter "assume_role_kwargs": "The AWS STS assume role auth parameter dict"

For more details, please refer to the AWS Assume Role Authentication documentation: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts/client/assume_role.html

.. code-block:: ini
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {"connections_path": "airflow-connections", "variables_path": null, "mount_point": "airflow", "url": "http://127.0.0.1:8200", "auth_type": "aws_iam", "role_arn": "arn:aws:iam::123456789000:role/hashicorp-aws-iam-role"}
backend_kwargs = {"connections_path": "airflow-connections", "variables_path": null, "mount_point": "airflow", "url": "http://127.0.0.1:8200", "auth_type": "aws_iam", "assume_role_kwargs": {"arn:aws:iam::123456789000:role/hashicorp-aws-iam-role", "RoleSessionName": "Airflow"}}
Using multiple mount points
"""""""""""""""""""""""""""
Expand Down
9 changes: 6 additions & 3 deletions tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ def test_test_with_existing_dag_run(self, caplog):
args = self.parser.parse_args(["tasks", "test", self.dag_id, task_id, DEFAULT_DATE.isoformat()])
with caplog.at_level("INFO", logger="airflow.task"):
task_command.task_test(args)
assert f"Marking task as SUCCESS. dag_id={self.dag_id}, task_id={task_id}" in caplog.text
assert (
f"Marking task as SUCCESS. dag_id={self.dag_id}, task_id={task_id}, run_id={self.run_id}, "
in caplog.text
)

@pytest.mark.enable_redact
@pytest.mark.filterwarnings("ignore::airflow.utils.context.AirflowContextDeprecationWarning")
Expand Down Expand Up @@ -828,7 +831,7 @@ def test_logging_with_run_task(self):

assert (
f"INFO - Marking task as SUCCESS. dag_id={self.dag_id}, "
f"task_id={self.task_id}, execution_date=20170101T000000" in logs
f"task_id={self.task_id}, run_id={self.run_id}, execution_date=20170101T000000" in logs
)

@pytest.mark.skipif(not hasattr(os, "fork"), reason="Forking not available")
Expand Down Expand Up @@ -868,7 +871,7 @@ def test_logging_with_run_task_subprocess(self):
assert f"INFO - Running: ['airflow', 'tasks', 'run', '{self.dag_id}', '{self.task_id}'," in logs
assert (
f"INFO - Marking task as SUCCESS. dag_id={self.dag_id}, "
f"task_id={self.task_id}, execution_date=20170101T000000" in logs
f"task_id={self.task_id}, run_id={self.run_id}, execution_date=20170101T000000" in logs
)

def test_log_file_template_with_run_task(self):
Expand Down

0 comments on commit 917d3b9

Please sign in to comment.