forked from apache/airflow
/
secrets_masker.py
250 lines (208 loc) · 8.96 KB
/
secrets_masker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Mask sensitive information from logs"""
import collections
import logging
import re
from typing import TYPE_CHECKING, Iterable, Optional, Set, TypeVar, Union
try:
# 3.8+
from functools import cached_property
except ImportError:
from cached_property import cached_property
try:
# 3.9+
from functools import cache
except ImportError:
from functools import lru_cache
cache = lru_cache(maxsize=None)
if TYPE_CHECKING:
from airflow.typing_compat import RePatternType
RedactableItem = TypeVar('RedctableItem')
log = logging.getLogger(__name__)
DEFAULT_SENSITIVE_FIELDS = frozenset(
{
'access_token',
'api_key',
'apikey',
'authorization',
'passphrase',
'passwd',
'password',
'private_key',
'secret',
}
)
"""Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive"""
@cache
def get_sensitive_variables_fields():
"""Get comma-separated sensitive Variable Fields from airflow.cfg."""
from airflow.configuration import conf
sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy()
sensitive_variable_fields = conf.get('core', 'sensitive_var_conn_names')
if sensitive_variable_fields:
sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(',')})
return sensitive_fields
def should_hide_value_for_key(name):
"""Should the value for this given name (Variable name, or key in conn.extra_dejson) be hidden"""
from airflow import settings
if isinstance(name, str) and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS:
name = name.strip().lower()
return any(s in name for s in get_sensitive_variables_fields())
return False
def mask_secret(secret: Union[str, dict, Iterable], name: str = None) -> None:
"""
Mask a secret from appearing in the task logs.
If ``name`` is provided, then it will only be masked if the name matches
one of the configured "sensitive" names.
If ``secret`` is a dict or a iterable (excluding str) then it will be
recursively walked and keys with sensitive names will be hidden.
"""
# Delay import
from airflow import settings
# Filtering all log messages is not a free process, so we only do it when
# running tasks
if not settings.MASK_SECRETS_IN_LOGS or not secret:
return
_secrets_masker().add_mask(secret, name)
def redact(value: "RedactableItem", name: str = None) -> "RedactableItem":
"""Redact any secrets found in ``value``."""
return _secrets_masker().redact(value, name)
@cache
def _secrets_masker() -> "SecretsMasker":
for flt in logging.getLogger('airflow.task').filters:
if isinstance(flt, SecretsMasker):
return flt
raise RuntimeError("No SecretsMasker found!")
class SecretsMasker(logging.Filter):
"""Redact secrets from logs"""
replacer: Optional["RePatternType"] = None
patterns: Set[str]
ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
MAX_RECURSION_DEPTH = 5
def __init__(self):
super().__init__()
self.patterns = set()
@cached_property
def _record_attrs_to_ignore(self) -> Iterable[str]:
# Doing log.info(..., extra={'foo': 2}) sets extra properties on
# record, i.e. record.foo. And we need to filter those too. Fun
#
# Create a record, and look at what attributes are on it, and ignore
# all the default ones!
record = logging.getLogRecordFactory()(
# name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None,
"x",
logging.INFO,
__file__,
1,
"",
tuple(),
exc_info=None,
func="funcname",
)
return frozenset(record.__dict__).difference({'msg', 'args'})
def filter(self, record) -> bool:
if self.ALREADY_FILTERED_FLAG in record.__dict__:
# Filters are attached to multiple handlers and logs, keep a
# "private" flag that stops us needing to process it more than once
return True
if self.replacer:
for k, v in record.__dict__.items():
if k in self._record_attrs_to_ignore:
continue
record.__dict__[k] = self.redact(v)
if record.exc_info and record.exc_info[1] is not None:
exc = record.exc_info[1]
# I'm not sure if this is a good idea!
exc.args = (self.redact(v) for v in exc.args)
record.__dict__[self.ALREADY_FILTERED_FLAG] = True
return True
def _redact_all(self, item: "RedactableItem", depth: int) -> "RedactableItem":
if depth > self.MAX_RECURSION_DEPTH or isinstance(item, str):
return '***'
if isinstance(item, dict):
return {dict_key: self._redact_all(subval, depth + 1) for dict_key, subval in item.items()}
elif isinstance(item, (tuple, set)):
# Turn set in to tuple!
return tuple(self._redact_all(subval, depth + 1) for subval in item)
elif isinstance(item, list):
return list(self._redact_all(subval, depth + 1) for subval in item)
else:
return item
# pylint: disable=too-many-return-statements
def _redact(self, item: "RedactableItem", name: Optional[str], depth: int) -> "RedactableItem":
# Avoid spending too much effort on redacting on deeply nested
# structures. This also avoid infinite recursion if a structure has
# reference to self.
if depth > self.MAX_RECURSION_DEPTH:
return item
try:
if name and should_hide_value_for_key(name):
return self._redact_all(item, depth)
if isinstance(item, dict):
return {
dict_key: self._redact(subval, name=dict_key, depth=(depth + 1))
for dict_key, subval in item.items()
}
elif isinstance(item, str):
if self.replacer:
# We can't replace specific values, but the key-based redacting
# can still happen, so we can't short-circuit, we need to walk
# the structure.
return self.replacer.sub('***', item)
return item
elif isinstance(item, (tuple, set)):
# Turn set in to tuple!
return tuple(self._redact(subval, name=None, depth=(depth + 1)) for subval in item)
elif isinstance(item, list):
return [self._redact(subval, name=None, depth=(depth + 1)) for subval in item]
else:
return item
# I think this should never happen, but it does not hurt to leave it just in case
except Exception as e: # pylint: disable=broad-except
log.warning(
"Unable to redact %r, please report this via <https://github.com/apache/airflow/issues>. "
"Error was: %s: %s",
item,
type(e).__name__,
str(e),
)
return item
def redact(self, item: "RedactableItem", name: Optional[str] = None) -> "RedactableItem":
"""Redact an any secrets found in ``item``, if it is a string.
If ``name`` is given, and it's a "sensitive" name (see
:func:`should_hide_value_for_key`) then all string values in the item
is redacted.
"""
return self._redact(item, name, depth=0)
# pylint: enable=too-many-return-statements
def add_mask(self, secret: Union[str, dict, Iterable], name: str = None):
"""Add a new secret to be masked to this filter instance."""
if isinstance(secret, dict):
for k, v in secret.items():
self.add_mask(v, k)
elif isinstance(secret, str):
if not secret:
return
pattern = re.escape(secret)
if pattern not in self.patterns and (not name or should_hide_value_for_key(name)):
self.patterns.add(pattern)
self.replacer = re.compile('|'.join(self.patterns))
elif isinstance(secret, collections.abc.Iterable):
for v in secret:
self.add_mask(v, name)