forked from PyCQA/bandit
/
config.py
244 lines (198 loc) · 9.19 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# -*- coding:utf-8 -*-
#
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# SPDX-License-Identifier: Apache-2.0
import logging
import yaml
from bandit.core import constants
from bandit.core import extension_loader
from bandit.core import utils
LOG = logging.getLogger(__name__)
class BanditConfig(object):
def __init__(self, config_file=None):
'''Attempt to initialize a config dictionary from a yaml file.
Error out if loading the yaml file fails for any reason.
:param config_file: The Bandit yaml config file
:raises bandit.utils.ConfigError: If the config is invalid or
unreadable.
'''
self.config_file = config_file
self._config = {}
if config_file:
try:
f = open(config_file, 'r')
except IOError:
raise utils.ConfigError("Could not read config file.",
config_file)
if config_file.endswith('.toml'):
import toml
try:
with f:
self._config = toml.load(f)['tool']['bandit']
except toml.TomlDecodeError as err:
LOG.error(err)
raise utils.ConfigError("Error parsing file.", config_file)
else:
try:
with f:
self._config = yaml.safe_load(f)
except yaml.YAMLError as err:
LOG.error(err)
raise utils.ConfigError("Error parsing file.", config_file)
self.validate(config_file)
# valid config must be a dict
if not isinstance(self._config, dict):
raise utils.ConfigError("Error parsing file.", config_file)
self.convert_legacy_config()
else:
# use sane defaults
self._config['plugin_name_pattern'] = '*.py'
self._config['include'] = ['*.py', '*.pyw']
self._init_settings()
def get_option(self, option_string):
'''Returns the option from the config specified by the option_string.
'.' can be used to denote levels, for example to retrieve the options
from the 'a' profile you can use 'profiles.a'
:param option_string: The string specifying the option to retrieve
:return: The object specified by the option_string, or None if it can't
be found.
'''
option_levels = option_string.split('.')
cur_item = self._config
for level in option_levels:
if cur_item and (level in cur_item):
cur_item = cur_item[level]
else:
return None
return cur_item
def get_setting(self, setting_name):
if setting_name in self._settings:
return self._settings[setting_name]
else:
return None
@property
def config(self):
'''Property to return the config dictionary
:return: Config dictionary
'''
return self._config
def _init_settings(self):
'''This function calls a set of other functions (one per setting)
This function calls a set of other functions (one per setting) to build
out the _settings dictionary. Each other function will set values from
the config (if set), otherwise use defaults (from constants if
possible).
:return: -
'''
self._settings = {}
self._init_plugin_name_pattern()
def _init_plugin_name_pattern(self):
'''Sets settings['plugin_name_pattern'] from default or config file.'''
plugin_name_pattern = constants.plugin_name_pattern
if self.get_option('plugin_name_pattern'):
plugin_name_pattern = self.get_option('plugin_name_pattern')
self._settings['plugin_name_pattern'] = plugin_name_pattern
def convert_legacy_config(self):
updated_profiles = self.convert_names_to_ids()
bad_calls, bad_imports = self.convert_legacy_blacklist_data()
if updated_profiles:
self.convert_legacy_blacklist_tests(updated_profiles,
bad_calls, bad_imports)
self._config['profiles'] = updated_profiles
def convert_names_to_ids(self):
'''Convert test names to IDs, unknown names are left unchanged.'''
extman = extension_loader.MANAGER
updated_profiles = {}
for name, profile in (self.get_option('profiles') or {}).items():
# NOTE(tkelsey): can't use default of get() because value is
# sometimes explicity 'None', for example when the list if given in
# yaml but not populated with any values.
include = set((extman.get_plugin_id(i) or i)
for i in (profile.get('include') or []))
exclude = set((extman.get_plugin_id(i) or i)
for i in (profile.get('exclude') or []))
updated_profiles[name] = {'include': include, 'exclude': exclude}
return updated_profiles
def convert_legacy_blacklist_data(self):
'''Detect legacy blacklist data and convert it to new format.'''
bad_calls_list = []
bad_imports_list = []
bad_calls = self.get_option('blacklist_calls') or {}
bad_calls = bad_calls.get('bad_name_sets', {})
for item in bad_calls:
for key, val in item.items():
val['name'] = key
val['message'] = val['message'].replace('{func}', '{name}')
bad_calls_list.append(val)
bad_imports = self.get_option('blacklist_imports') or {}
bad_imports = bad_imports.get('bad_import_sets', {})
for item in bad_imports:
for key, val in item.items():
val['name'] = key
val['message'] = val['message'].replace('{module}', '{name}')
val['qualnames'] = val['imports']
del val['imports']
bad_imports_list.append(val)
if bad_imports_list or bad_calls_list:
LOG.warning('Legacy blacklist data found in config, overriding '
'data plugins')
return bad_calls_list, bad_imports_list
@staticmethod
def convert_legacy_blacklist_tests(profiles, bad_imports, bad_calls):
'''Detect old blacklist tests, convert to use new builtin.'''
def _clean_set(name, data):
if name in data:
data.remove(name)
data.add('B001')
for name, profile in profiles.items():
blacklist = {}
include = profile['include']
exclude = profile['exclude']
name = 'blacklist_calls'
if name in include and name not in exclude:
blacklist.setdefault('Call', []).extend(bad_calls)
_clean_set(name, include)
_clean_set(name, exclude)
name = 'blacklist_imports'
if name in include and name not in exclude:
blacklist.setdefault('Import', []).extend(bad_imports)
blacklist.setdefault('ImportFrom', []).extend(bad_imports)
blacklist.setdefault('Call', []).extend(bad_imports)
_clean_set(name, include)
_clean_set(name, exclude)
_clean_set('blacklist_import_func', include)
_clean_set('blacklist_import_func', exclude)
# This can happen with a legacy config that includes
# blacklist_calls but exclude blacklist_imports for example
if 'B001' in include and 'B001' in exclude:
exclude.remove('B001')
profile['blacklist'] = blacklist
def validate(self, path):
'''Validate the config data.'''
legacy = False
message = ("Config file has an include or exclude reference "
"to legacy test '{0}' but no configuration data for "
"it. Configuration data is required for this test. "
"Please consider switching to the new config file "
"format, the tool 'bandit-config-generator' can help "
"you with this.")
def _test(key, block, exclude, include):
if key in exclude or key in include:
if self._config.get(block) is None:
raise utils.ConfigError(message.format(key), path)
if 'profiles' in self._config:
legacy = True
for profile in self._config['profiles'].values():
inc = profile.get('include') or set()
exc = profile.get('exclude') or set()
_test('blacklist_imports', 'blacklist_imports', inc, exc)
_test('blacklist_import_func', 'blacklist_imports', inc, exc)
_test('blacklist_calls', 'blacklist_calls', inc, exc)
# show deprecation message
if legacy:
LOG.warning("Config file '%s' contains deprecated legacy config "
"data. Please consider upgrading to the new config "
"format. The tool 'bandit-config-generator' can help "
"you with this. Support for legacy configs will be "
"removed in a future bandit version.", path)