-
-
Notifications
You must be signed in to change notification settings - Fork 287
/
vault_loader.py
186 lines (167 loc) · 6.26 KB
/
vault_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# docker run -e 'VAULT_DEV_ROOT_TOKEN_ID=myroot' -p 8200:8200 vault
# pip install hvac
from __future__ import annotations
from dynaconf.utils import build_env_list
from dynaconf.utils.parse_conf import parse_conf_data
try:
import boto3
except ImportError:
boto3 = None
try:
from hvac import Client
from hvac.exceptions import InvalidPath
except ImportError:
raise ImportError(
"vault package is not installed in your environment. "
"`pip install dynaconf[vault]` or disable the vault loader with "
"export VAULT_ENABLED_FOR_DYNACONF=false"
)
IDENTIFIER = "vault"
# backwards compatibility
_get_env_list = build_env_list
def get_client(obj):
client = Client(
**{k: v for k, v in obj.VAULT_FOR_DYNACONF.items() if v is not None}
)
if obj.VAULT_ROLE_ID_FOR_DYNACONF is not None:
client.auth_approle(
role_id=obj.VAULT_ROLE_ID_FOR_DYNACONF,
secret_id=obj.get("VAULT_SECRET_ID_FOR_DYNACONF"),
)
elif obj.VAULT_ROOT_TOKEN_FOR_DYNACONF is not None:
client.token = obj.VAULT_ROOT_TOKEN_FOR_DYNACONF
elif obj.VAULT_AUTH_WITH_IAM_FOR_DYNACONF:
if boto3 is None:
raise ImportError(
"boto3 package is not installed in your environment. "
"`pip install boto3` or disable the VAULT_AUTH_WITH_IAM"
)
session = boto3.Session()
credentials = session.get_credentials()
client.auth.aws.iam_login(
credentials.access_key,
credentials.secret_key,
credentials.token,
role=obj.VAULT_AUTH_ROLE_FOR_DYNACONF,
)
assert client.is_authenticated(), (
"Vault authentication error: is VAULT_TOKEN_FOR_DYNACONF or "
"VAULT_ROLE_ID_FOR_DYNACONF defined?"
)
client.kv.default_kv_version = obj.VAULT_KV_VERSION_FOR_DYNACONF
return client
def load(obj, env=None, silent=None, key=None):
"""Reads and loads in to "settings" a single key or all keys from vault
:param obj: the settings instance
:param env: settings env default='DYNACONF'
:param silent: if errors should raise
:param key: if defined load a single key, else load all in env
:return: None
"""
client = get_client(obj)
try:
if obj.VAULT_KV_VERSION_FOR_DYNACONF == 2:
dirs = client.secrets.kv.v2.list_secrets(
path=obj.VAULT_PATH_FOR_DYNACONF,
mount_point=obj.VAULT_MOUNT_POINT_FOR_DYNACONF,
)["data"]["keys"]
else:
dirs = client.secrets.kv.v1.list_secrets(
path=obj.VAULT_PATH_FOR_DYNACONF,
mount_point=obj.VAULT_MOUNT_POINT_FOR_DYNACONF,
)["data"]["keys"]
except InvalidPath:
# The given path is not a directory
dirs = []
# First look for secrets into environments less store
if not obj.ENVIRONMENTS_FOR_DYNACONF:
# By adding '', dynaconf will now read secrets from environments-less
# store which are not written by `dynaconf write` to Vault store
env_list = [obj.MAIN_ENV_FOR_DYNACONF.lower(), ""]
# Finally, look for secret into all the environments
else:
env_list = dirs + build_env_list(obj, env)
for env in env_list:
path = "/".join([obj.VAULT_PATH_FOR_DYNACONF, env])
try:
if obj.VAULT_KV_VERSION_FOR_DYNACONF == 2:
data = client.secrets.kv.v2.read_secret_version(
path, mount_point=obj.VAULT_MOUNT_POINT_FOR_DYNACONF
)
else:
data = client.secrets.kv.read_secret(
"data/" + path,
mount_point=obj.VAULT_MOUNT_POINT_FOR_DYNACONF,
)
except InvalidPath:
# If the path doesn't exist, ignore it and set data to None
data = None
if data:
# There seems to be a data dict within a data dict,
# extract the inner data
data = data.get("data", {}).get("data", {})
try:
if (
obj.VAULT_KV_VERSION_FOR_DYNACONF == 2
and obj.ENVIRONMENTS_FOR_DYNACONF
and data
):
data = data.get("data", {})
if data and key:
value = parse_conf_data(
data.get(key), tomlfy=True, box_settings=obj
)
if value:
obj.set(key, value)
elif data:
obj.update(data, loader_identifier=IDENTIFIER, tomlfy=True)
except Exception:
if silent:
return False
raise
def write(obj, data=None, **kwargs):
"""Write a value in to loader source
:param obj: settings object
:param data: vars to be stored
:param kwargs: vars to be stored
:return:
"""
if obj.VAULT_ENABLED_FOR_DYNACONF is False:
raise RuntimeError(
"Vault is not configured \n"
"export VAULT_ENABLED_FOR_DYNACONF=true\n"
"and configure the VAULT_FOR_DYNACONF_* variables"
)
data = data or {}
data.update(kwargs)
if not data:
raise AttributeError("Data must be provided")
data = {"data": data}
client = get_client(obj)
if obj.VAULT_KV_VERSION_FOR_DYNACONF == 1:
mount_point = obj.VAULT_MOUNT_POINT_FOR_DYNACONF + "/data"
else:
mount_point = obj.VAULT_MOUNT_POINT_FOR_DYNACONF
path = "/".join([obj.VAULT_PATH_FOR_DYNACONF, obj.current_env.lower()])
client.secrets.kv.create_or_update_secret(
path, secret=data, mount_point=mount_point
)
load(obj)
def list_envs(obj, path=""):
"""
This function is a helper to get a list of all the existing envs in
the source of data, the use case is:
existing_envs = vault_loader.list_envs(settings)
for env in exiting_envs:
with settings.using_env(env): # switch to the env
# do something with a key of that env
:param obj: settings object
:param path: path to the vault secrets
:return: list containing all the keys at the given path
"""
client = get_client(obj)
path = path or obj.get("VAULT_PATH_FOR_DYNACONF")
try:
return client.list(f"/secret/metadata/{path}")["data"]["keys"]
except TypeError:
return []