Skip to content

Commit

Permalink
Suggest fixes for issues
Browse files Browse the repository at this point in the history
This change introduces a new feature that will suggest a fix in
the form of a line of code as replacement for the line range of
the issue.

This is the first step to have the ability to auto-correct problems
detected. Later more changes can be merged to modify the file with
the suggested fix.

The Issue class has a new fix string attribute that denotes how
the lines of affected code can be replaced. This suggested fix
will not preserve code comments and possibly other optimizations
the AST does not capture.

Closes PyCQA#439

Signed-off-by: Eric Brown <browne@vmware.com>
  • Loading branch information
ericwb committed Mar 19, 2023
1 parent 02d73e9 commit a78785e
Show file tree
Hide file tree
Showing 14 changed files with 205 additions and 4 deletions.
52 changes: 52 additions & 0 deletions bandit/core/context.py
Expand Up @@ -3,9 +3,14 @@
#
# SPDX-License-Identifier: Apache-2.0
import ast
import linecache
import sys

from bandit.core import utils

if sys.version_info < (3, 9):
import astunparse


class Context:
def __init__(self, context_object=None):
Expand Down Expand Up @@ -311,6 +316,53 @@ def is_module_imported_like(self, module):
return True
return False

def get_outer_text(self):
"""Get the text to the left and right of the node in context.
Gets the text to the left and text to the right of the node in
context. This function depends on knowing the line range, col_offset,
and end_col_offset.
:return: outer text as tuple
"""
lineno = self._context.get("linerange")[0]
end_lineno = self._context.get("linerange")[-1]
col_offset = self._context.get("col_offset")
end_col_offset = self._context.get("end_col_offset")

if self._context.get("filename") == "<stdin>":
self._context.get("file_data").seek(0)
for line_num in range(1, lineno):
self._context.get("file_data").readline()
line = self._context.get("file_data").readline()
end_line = line
if end_lineno > lineno:
for line_num in range(1, end_lineno):
self._context.get("file_data").readline()
end_line = self._context.get("file_data").readline()
else:
line = linecache.getline(self._context.get("filename"), lineno)
end_line = linecache.getline(
self._context.get("filename"), end_lineno
)

return (line[:col_offset], end_line[end_col_offset:])

def unparse(self, transformer):
"""Unparse an ast node using given transformer
:param transformer: NodeTransformer that fixes the ast
:return: node as statement string
"""
fixed_node = ast.fix_missing_locations(transformer)
outer_text = self.get_outer_text()
if sys.version_info >= (3, 9):
return outer_text[0] + ast.unparse(fixed_node) + outer_text[1]
else:
return (
outer_text[0] + astunparse.unparse(fixed_node) + outer_text[1]
)

@property
def filename(self):
return self._context.get("filename")
Expand Down
10 changes: 9 additions & 1 deletion bandit/core/issue.py
Expand Up @@ -86,6 +86,10 @@ def __init__(
test_id="",
col_offset=0,
end_col_offset=0,
<<<<<<< HEAD
=======
fix=None,
>>>>>>> 87ce057 (Suggest fixes for issues)
):
self.severity = severity
self.cwe = Cwe(cwe)
Expand All @@ -102,6 +106,7 @@ def __init__(
self.col_offset = col_offset
self.end_col_offset = end_col_offset
self.linerange = []
self.fix = fix

def __str__(self):
return (
Expand Down Expand Up @@ -194,7 +199,7 @@ def get_code(self, max_lines=3, tabbed=False):
if not len(text):
break
lines.append(tmplt % (line, text))
return "".join(lines)
return "".join(lines).rstrip()

def as_dict(self, with_code=True, max_lines=3):
"""Convert the issue to a dict of values for outputting."""
Expand All @@ -214,6 +219,8 @@ def as_dict(self, with_code=True, max_lines=3):

if with_code:
out["code"] = self.get_code(max_lines=max_lines)
if self.fix:
out["fix"] = self.fix
return out

def from_dict(self, data, with_code=True):
Expand All @@ -229,6 +236,7 @@ def from_dict(self, data, with_code=True):
self.linerange = data["line_range"]
self.col_offset = data.get("col_offset", 0)
self.end_col_offset = data.get("end_col_offset", 0)
self.fix = data.get("fix")


def cwe_from_dict(data):
Expand Down
4 changes: 1 addition & 3 deletions bandit/core/node_visitor.py
Expand Up @@ -14,7 +14,7 @@
LOG = logging.getLogger(__name__)


class BanditNodeVisitor:
class BanditNodeVisitor(ast.NodeTransformer):
def __init__(
self, fname, fdata, metaast, testset, debug, nosec_lines, metrics
):
Expand Down Expand Up @@ -66,7 +66,6 @@ def visit_FunctionDef(self, node):
:param node: The node that is being inspected
:return: -
"""

self.context["function"] = node
qualname = self.namespace + "." + b_utils.get_func_name(node)
name = qualname.split(".")[-1]
Expand All @@ -87,7 +86,6 @@ def visit_Call(self, node):
:param node: The node that is being inspected
:return: -
"""

self.context["call"] = node
qualname = b_utils.get_call_name(node, self.import_aliases)
name = qualname.split(".")[-1]
Expand Down
14 changes: 14 additions & 0 deletions bandit/formatters/html.py
Expand Up @@ -271,9 +271,19 @@ def report(manager, fileobj, sev_level, conf_level, lines=-1):
<b>Line number: </b>{line_number}<br>
<b>More info: </b><a href="{url}" target="_blank">{url}</a><br>
{code}
<b>Suggested Fix:</b><br>
{fix}
{candidates}
</div>
</div>
"""

fix_block = """
<div class="fix">
<pre>
{fix}
</pre>
</div>
"""

code_block = """
Expand Down Expand Up @@ -358,6 +368,9 @@ def report(manager, fileobj, sev_level, conf_level, lines=-1):
candidates = candidate_block.format(candidate_list=candidates_str)

url = docs_utils.get_url(issue.test_id)
fix = (
fix_block.format(fix=html_escape(issue.fix)) if issue.fix else None
)
results_str += issue_block.format(
issue_no=index,
issue_class=f"issue-sev-{issue.severity.lower()}",
Expand All @@ -373,6 +386,7 @@ def report(manager, fileobj, sev_level, conf_level, lines=-1):
candidates=candidates,
url=url,
line_number=issue.lineno,
fix=fix,
)

# build the metrics string to insert in the report
Expand Down
7 changes: 7 additions & 0 deletions bandit/formatters/screen.py
Expand Up @@ -146,6 +146,13 @@ def _output_issue_str(
[indent + line for line in issue.get_code(lines, True).split("\n")]
)

if issue.fix:
bits.append(
f"{indent} {COLOR[issue.severity]}"
f"Suggested Fix:{COLOR['DEFAULT']}"
)
bits.append(f"\t{issue.fix}")

return "\n".join([bit for bit in bits])


Expand Down
4 changes: 4 additions & 0 deletions bandit/formatters/text.py
Expand Up @@ -113,6 +113,10 @@ def _output_issue_str(
[indent + line for line in issue.get_code(lines, True).split("\n")]
)

if issue.fix:
bits.append(f"{indent} Suggested Fix:")
bits.append(f"\t{issue.fix}")

return "\n".join([bit for bit in bits])


Expand Down
3 changes: 3 additions & 0 deletions bandit/plugins/app_debug.py
Expand Up @@ -52,6 +52,8 @@ def flask_debug_true(context):
if context.is_module_imported_like("flask"):
if context.call_function_name_qual.endswith(".run"):
if context.check_call_arg_value("debug", "True"):
context.node.keywords[0].value.value = False

return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.MEDIUM,
Expand All @@ -60,4 +62,5 @@ def flask_debug_true(context):
"which exposes the Werkzeug debugger and allows "
"the execution of arbitrary code.",
lineno=context.get_lineno_for_call_arg("debug"),
fix=context.unparse(context.node),
)
3 changes: 3 additions & 0 deletions bandit/plugins/crypto_request_no_cert_validation.py
Expand Up @@ -65,11 +65,14 @@ def request_with_no_cert_validation(context):
and context.call_function_name in HTTPX_ATTRS
):
if context.check_call_arg_value("verify", "False"):
context.node.keywords[0].value.value = True

return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.HIGH,
cwe=issue.Cwe.IMPROPER_CERT_VALIDATION,
text=f"Call to {qualname} with verify=False disabling SSL "
"certificate checks, security issue.",
lineno=context.get_lineno_for_call_arg("verify"),
fix=context.unparse(context.node),
)
28 changes: 28 additions & 0 deletions bandit/plugins/hashlib_insecure_functions.py
Expand Up @@ -41,6 +41,7 @@
CWE information added
""" # noqa: E501
import ast
import sys

import bandit
Expand All @@ -51,6 +52,18 @@
WEAK_HASHES = ("md4", "md5", "sha", "sha1")


def transform(node):
found = False
for keyword in node.keywords:
if keyword.arg == "usedforsecurity":
keyword.value.value = False
found = True
if not found:
keyword = ast.keyword("usedforsecurity", ast.Constant(False))
node.keywords.append(keyword)
return node


def _hashlib_func(context):
if isinstance(context.call_function_name_qual, str):
qualname_list = context.call_function_name_qual.split(".")
Expand All @@ -68,6 +81,7 @@ def _hashlib_func(context):
text=f"Use of weak {func.upper()} hash for security. "
"Consider usedforsecurity=False",
lineno=context.node.lineno,
fix=context.unparse(transform(context.node)),
)
elif func == "new":
args = context.call_args
Expand All @@ -81,6 +95,7 @@ def _hashlib_func(context):
text=f"Use of weak {name.upper()} hash for "
"security. Consider usedforsecurity=False",
lineno=context.node.lineno,
fix=context.unparse(transform(context.node)),
)


Expand All @@ -94,12 +109,25 @@ def _hashlib_new(context):
keywords = context.call_keywords
name = args[0] if args else keywords.get("name", None)
if isinstance(name, str) and name.lower() in WEAK_HASHES:
if len(context.node.args):
if sys.version_info >= (3, 8):
# Call(func=Attribute(value=Name(id='hashlib',
# ctx=Load()), attr='new', ctx=Load()),
# args=[Constant(value='md5', kind=None)], keywords=[])
context.node.args[0].value = "sha224"
elif isinstance(context.node.args[0], ast.Str):
# Call(func=Attribute(value=Name(id='hashlib',
# ctx=Load()), attr='new', ctx=Load()),
# args=[Str(s='md5')], keywords=[])
context.node.args[0] = ast.Str("sha224")

return bandit.Issue(
severity=bandit.MEDIUM,
confidence=bandit.HIGH,
cwe=issue.Cwe.BROKEN_CRYPTO,
text=f"Use of insecure {name.upper()} hash function.",
lineno=context.node.lineno,
fix=context.unparse(context.node),
)


Expand Down
12 changes: 12 additions & 0 deletions bandit/plugins/jinja2_templates.py
Expand Up @@ -85,6 +85,8 @@ def jinja2_autoescape_false(context):
getattr(node.value, "id", None) == "False"
or getattr(node.value, "value", None) is False
):
node.value.value = True

return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.HIGH,
Expand All @@ -94,6 +96,7 @@ def jinja2_autoescape_false(context):
"Use autoescape=True or use the "
"select_autoescape function to mitigate XSS "
"vulnerabilities.",
fix=context.unparse(context.node),
)
# found autoescape
if getattr(node, "arg", None) == "autoescape":
Expand All @@ -112,6 +115,8 @@ def jinja2_autoescape_false(context):
):
return
else:
node.value = ast.Constant(value=True, kind=None)

return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.MEDIUM,
Expand All @@ -121,14 +126,21 @@ def jinja2_autoescape_false(context):
"Ensure autoescape=True or use the "
"select_autoescape function to mitigate "
"XSS vulnerabilities.",
fix=context.unparse(context.node),
)
# We haven't found a keyword named autoescape, indicating default
# behavior
keyword = ast.keyword(
"autoescape", ast.Constant(value=True, kind=None)
)
context.node.keywords.append(keyword)

return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.HIGH,
cwe=issue.Cwe.CODE_INJECTION,
text="By default, jinja2 sets autoescape to False. Consider "
"using autoescape=True or use the select_autoescape "
"function to mitigate XSS vulnerabilities.",
fix=context.unparse(context.node),
)
3 changes: 3 additions & 0 deletions bandit/plugins/ssh_no_host_key_verification.py
Expand Up @@ -51,6 +51,8 @@ def ssh_no_host_key_verification(context):
"AutoAddPolicy",
"WarningPolicy",
]:
context.node.args[0].attr = "RejectPolicy"

return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.MEDIUM,
Expand All @@ -60,4 +62,5 @@ def ssh_no_host_key_verification(context):
lineno=context.get_lineno_for_call_arg(
"set_missing_host_key_policy"
),
fix=context.unparse(context.node),
)
16 changes: 16 additions & 0 deletions bandit/plugins/yaml_load.py
Expand Up @@ -66,11 +66,27 @@ def yaml_load(context):
not context.get_call_arg_at_position(1) == "CSafeLoader",
]
):
if getattr(context.node.func, "attr", None) == "load":
context.node.func.attr = "safe_load"
for keyword in context.node.keywords:
if keyword.arg == "Loader":
context.node.keywords.remove(keyword)
break
elif getattr(context.node.func, "id", None) == "load":
# Suggesting a switch to safe_load won't work without the import.
# Therefore switch to a SafeLoader.
# TODO: fix this
for keyword in context.node.keywords:
if keyword.arg == "Loader":
context.node.keywords.remove(keyword)
break

return bandit.Issue(
severity=bandit.MEDIUM,
confidence=bandit.HIGH,
cwe=issue.Cwe.IMPROPER_INPUT_VALIDATION,
text="Use of unsafe yaml load. Allows instantiation of"
" arbitrary objects. Consider yaml.safe_load().",
lineno=context.node.lineno,
fix=context.unparse(context.node),
)

0 comments on commit a78785e

Please sign in to comment.