Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Connection creation from Vault parameters #15013

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 36 additions & 5 deletions airflow/providers/hashicorp/secrets/vault.py
Expand Up @@ -18,6 +18,7 @@
"""Objects relating to sourcing connections & variables from Hashicorp Vault"""
from typing import Optional

from airflow.models.connection import Connection
from airflow.providers.hashicorp._internal_client.vault_client import _VaultClient
from airflow.secrets import BaseSecretsBackend
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -178,6 +179,20 @@ def __init__(
**kwargs,
)

def get_response(self, conn_id: str) -> Optional[dict]:
"""
Get data from Vault

:type conn_id: str
:rtype: dict
:return: The data from the Vault path if exists
"""
if self.connections_path is None:
return None

secret_path = self.build_path(self.connections_path, conn_id)
return self.vault_client.get_secret(secret_path=secret_path)

def get_conn_uri(self, conn_id: str) -> Optional[str]:
"""
Get secret value from Vault. Store the secret in the form of URI
Expand All @@ -187,12 +202,28 @@ def get_conn_uri(self, conn_id: str) -> Optional[str]:
:rtype: str
:return: The connection uri retrieved from the secret
"""
if self.connections_path is None:
response = self.get_response(conn_id)

return response.get("conn_uri") if response else None

def get_connection(self, conn_id: str) -> Optional[Connection]:
"""
Get connection from Vault as secret. Prioritize conn_uri if exists,
if not fall back to normal Connection creation.

:type conn_id: str
:rtype: Connection
:return: A Connection object constructed from Vault data
"""
response = self.get_response(conn_id)
if response is None:
return None
else:
secret_path = self.build_path(self.connections_path, conn_id)
response = self.vault_client.get_secret(secret_path=secret_path)
return response.get("conn_uri") if response else None

uri = response.get("conn_uri")
if uri:
return Connection(conn_id, uri=uri)

return Connection(conn_id, **response)

def get_variable(self, key: str) -> Optional[str]:
"""
Expand Down
Expand Up @@ -44,6 +44,15 @@ key to ``backend_kwargs``:

export VAULT_ADDR="http://127.0.0.1:8200"

Set up a Vault mount point
""""""""""""""""""""""""""

You can make a ``mount_point`` for ``airflow`` as follows:

.. code-block:: bash

vault secrets enable -path=airflow -version=2 kv

Optional lookup
"""""""""""""""

Expand All @@ -60,8 +69,8 @@ For example, if you want to set parameter ``connections_path`` to ``"airflow-con
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {"connections_path": "airflow-connections", "variables_path": null, "mount_point": "airflow", "url": "http://127.0.0.1:8200"}

Storing and Retrieving Connections
""""""""""""""""""""""""""""""""""
Storing and Retrieving Connections using connection URI representation
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

If you have set ``connections_path`` as ``connections`` and ``mount_point`` as ``airflow``, then for a connection id of
``smtp_default``, you would want to store your secret as:
Expand All @@ -73,12 +82,6 @@ If you have set ``connections_path`` as ``connections`` and ``mount_point`` as `
Note that the ``Key`` is ``conn_uri``, ``Value`` is ``postgresql://airflow:airflow@host:5432/airflow`` and
``mount_point`` is ``airflow``.

You can make a ``mount_point`` for ``airflow`` as follows:

.. code-block:: bash

vault secrets enable -path=airflow -version=2 kv

Verify that you can get the secret from ``vault``:

.. code-block:: console
Expand All @@ -100,6 +103,40 @@ Verify that you can get the secret from ``vault``:
The value of the Vault key must be the :ref:`connection URI representation <generating_connection_uri>`
of the connection object to get connection.

Storing and Retrieving Connections using Connection class representation
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

If you have set ``connections_path`` as ``connections`` and ``mount_point`` as ``airflow``, then for a connection id of
``smtp_default``, you would want to store your secret as:

.. code-block:: bash

vault kv put airflow/connections/smtp_default conn_type=smtps login=user password=host host=relay.example.com port=465

Note that the ``Keys`` are parameters of the ``Connection`` class and the ``Value`` their argument.

Verify that you can get the secret from ``vault``:

.. code-block:: console

❯ vault kv get airflow/connections/smtp_default
====== Metadata ======
Key Value
--- -----
created_time 2020-03-19T19:17:51.281721Z
deletion_time n/a
destroyed false
version 1

====== Data ======
Key Value
--- -----
conn_type smtps
login user
password host
host relay.example.com
port 465

Storing and Retrieving Variables
""""""""""""""""""""""""""""""""

Expand Down
43 changes: 43 additions & 0 deletions tests/providers/hashicorp/secrets/test_vault.py
Expand Up @@ -59,6 +59,49 @@ def test_get_conn_uri(self, mock_hvac):
returned_uri = test_client.get_conn_uri(conn_id="test_postgres")
assert 'postgresql://airflow:airflow@host:5432/airflow' == returned_uri

@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
def test_get_connection(self, mock_hvac):
mock_client = mock.MagicMock()
mock_hvac.Client.return_value = mock_client
mock_client.secrets.kv.v2.read_secret_version.return_value = {
'request_id': '94011e25-f8dc-ec29-221b-1f9c1d9ad2ae',
'lease_id': '',
'renewable': False,
'lease_duration': 0,
'data': {
'data': {
'conn_type': 'postgresql',
'login': 'airflow',
'password': 'airflow',
'host': 'host',
'port': '5432',
'schema': 'airflow',
'extra': '{"foo":"bar","baz":"taz"}',
},
'metadata': {
'created_time': '2020-03-16T21:01:43.331126Z',
'deletion_time': '',
'destroyed': False,
'version': 1,
},
},
'wrap_info': None,
'warnings': None,
'auth': None,
}

kwargs = {
"connections_path": "connections",
"mount_point": "airflow",
"auth_type": "token",
"url": "http://127.0.0.1:8200",
"token": "s.7AU0I51yv1Q1lxOIg1F3ZRAS",
}

test_client = VaultBackend(**kwargs)
connection = test_client.get_connection(conn_id="test_postgres")
assert 'postgresql://airflow:airflow@host:5432/airflow?foo=bar&baz=taz' == connection.get_uri()

@mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
def test_get_conn_uri_engine_version_1(self, mock_hvac):
mock_client = mock.MagicMock()
Expand Down