Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggest fixes for issues #1001

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 52 additions & 0 deletions bandit/core/context.py
Expand Up @@ -3,9 +3,14 @@
#
# SPDX-License-Identifier: Apache-2.0
import ast
import linecache
import sys

from bandit.core import utils

if sys.version_info < (3, 9):
import astunparse


class Context:
def __init__(self, context_object=None):
Expand Down Expand Up @@ -311,6 +316,53 @@ def is_module_imported_like(self, module):
return True
return False

def get_outer_text(self):
"""Get the text to the left and right of the node in context.

Gets the text to the left and text to the right of the node in
context. This function depends on knowing the line range, col_offset,
and end_col_offset.

:return: outer text as tuple
"""
lineno = self._context.get("linerange")[0]
end_lineno = self._context.get("linerange")[-1]
col_offset = self._context.get("col_offset")
end_col_offset = self._context.get("end_col_offset")

if self._context.get("filename") == "<stdin>":
self._context.get("file_data").seek(0)
for line_num in range(1, lineno):
self._context.get("file_data").readline()
line = self._context.get("file_data").readline()
end_line = line
if end_lineno > lineno:
for line_num in range(1, end_lineno):
self._context.get("file_data").readline()
end_line = self._context.get("file_data").readline()
else:
line = linecache.getline(self._context.get("filename"), lineno)
end_line = linecache.getline(
self._context.get("filename"), end_lineno
)

return (line[:col_offset], end_line[end_col_offset:])

def unparse(self, transformer):
"""Unparse an ast node using given transformer

:param transformer: NodeTransformer that fixes the ast
:return: node as statement string
"""
fixed_node = ast.fix_missing_locations(transformer)
outer_text = self.get_outer_text()
if sys.version_info >= (3, 9):
return outer_text[0] + ast.unparse(fixed_node) + outer_text[1]
else:
return (
outer_text[0] + astunparse.unparse(fixed_node) + outer_text[1]
)

@property
def filename(self):
return self._context.get("filename")
Expand Down
7 changes: 6 additions & 1 deletion bandit/core/issue.py
Expand Up @@ -86,6 +86,7 @@ def __init__(
test_id="",
col_offset=0,
end_col_offset=0,
fix=None,
):
self.severity = severity
self.cwe = Cwe(cwe)
Expand All @@ -102,6 +103,7 @@ def __init__(
self.col_offset = col_offset
self.end_col_offset = end_col_offset
self.linerange = []
self.fix = fix

def __str__(self):
return (
Expand Down Expand Up @@ -194,7 +196,7 @@ def get_code(self, max_lines=3, tabbed=False):
if not len(text):
break
lines.append(tmplt % (line, text))
return "".join(lines)
return "".join(lines).rstrip()

def as_dict(self, with_code=True, max_lines=3):
"""Convert the issue to a dict of values for outputting."""
Expand All @@ -214,6 +216,8 @@ def as_dict(self, with_code=True, max_lines=3):

if with_code:
out["code"] = self.get_code(max_lines=max_lines)
if self.fix:
out["fix"] = self.fix
return out

def from_dict(self, data, with_code=True):
Expand All @@ -229,6 +233,7 @@ def from_dict(self, data, with_code=True):
self.linerange = data["line_range"]
self.col_offset = data.get("col_offset", 0)
self.end_col_offset = data.get("end_col_offset", 0)
self.fix = data.get("fix")


def cwe_from_dict(data):
Expand Down
4 changes: 1 addition & 3 deletions bandit/core/node_visitor.py
Expand Up @@ -14,7 +14,7 @@
LOG = logging.getLogger(__name__)


class BanditNodeVisitor:
class BanditNodeVisitor(ast.NodeTransformer):
def __init__(
self, fname, fdata, metaast, testset, debug, nosec_lines, metrics
):
Expand Down Expand Up @@ -66,7 +66,6 @@ def visit_FunctionDef(self, node):
:param node: The node that is being inspected
:return: -
"""

self.context["function"] = node
qualname = self.namespace + "." + b_utils.get_func_name(node)
name = qualname.split(".")[-1]
Expand All @@ -87,7 +86,6 @@ def visit_Call(self, node):
:param node: The node that is being inspected
:return: -
"""

self.context["call"] = node
qualname = b_utils.get_call_name(node, self.import_aliases)
name = qualname.split(".")[-1]
Expand Down
14 changes: 14 additions & 0 deletions bandit/formatters/html.py
Expand Up @@ -271,9 +271,19 @@ def report(manager, fileobj, sev_level, conf_level, lines=-1):
<b>Line number: </b>{line_number}<br>
<b>More info: </b><a href="{url}" target="_blank">{url}</a><br>
{code}
<b>Suggested Fix:</b><br>
{fix}
{candidates}
</div>
</div>
"""

fix_block = """
<div class="fix">
<pre>
{fix}
</pre>
</div>
"""

code_block = """
Expand Down Expand Up @@ -358,6 +368,9 @@ def report(manager, fileobj, sev_level, conf_level, lines=-1):
candidates = candidate_block.format(candidate_list=candidates_str)

url = docs_utils.get_url(issue.test_id)
fix = (
fix_block.format(fix=html_escape(issue.fix)) if issue.fix else None
)
results_str += issue_block.format(
issue_no=index,
issue_class=f"issue-sev-{issue.severity.lower()}",
Expand All @@ -373,6 +386,7 @@ def report(manager, fileobj, sev_level, conf_level, lines=-1):
candidates=candidates,
url=url,
line_number=issue.lineno,
fix=fix,
)

# build the metrics string to insert in the report
Expand Down
7 changes: 7 additions & 0 deletions bandit/formatters/screen.py
Expand Up @@ -146,6 +146,13 @@ def _output_issue_str(
[indent + line for line in issue.get_code(lines, True).split("\n")]
)

if issue.fix:
bits.append(
f"{indent} {COLOR[issue.severity]}"
f"Suggested Fix:{COLOR['DEFAULT']}"
)
bits.append(f"\t{issue.fix}")

return "\n".join([bit for bit in bits])


Expand Down
4 changes: 4 additions & 0 deletions bandit/formatters/text.py
Expand Up @@ -113,6 +113,10 @@ def _output_issue_str(
[indent + line for line in issue.get_code(lines, True).split("\n")]
)

if issue.fix:
bits.append(f"{indent} Suggested Fix:")
bits.append(f"\t{issue.fix}")

return "\n".join([bit for bit in bits])


Expand Down
3 changes: 3 additions & 0 deletions bandit/plugins/app_debug.py
Expand Up @@ -52,6 +52,8 @@ def flask_debug_true(context):
if context.is_module_imported_like("flask"):
if context.call_function_name_qual.endswith(".run"):
if context.check_call_arg_value("debug", "True"):
context.node.keywords[0].value.value = False

return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.MEDIUM,
Expand All @@ -60,4 +62,5 @@ def flask_debug_true(context):
"which exposes the Werkzeug debugger and allows "
"the execution of arbitrary code.",
lineno=context.get_lineno_for_call_arg("debug"),
fix=context.unparse(context.node),
)
3 changes: 3 additions & 0 deletions bandit/plugins/crypto_request_no_cert_validation.py
Expand Up @@ -65,11 +65,14 @@ def request_with_no_cert_validation(context):
and context.call_function_name in HTTPX_ATTRS
):
if context.check_call_arg_value("verify", "False"):
context.node.keywords[0].value.value = True

return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.HIGH,
cwe=issue.Cwe.IMPROPER_CERT_VALIDATION,
text=f"Call to {qualname} with verify=False disabling SSL "
"certificate checks, security issue.",
lineno=context.get_lineno_for_call_arg("verify"),
fix=context.unparse(context.node),
)
28 changes: 28 additions & 0 deletions bandit/plugins/hashlib_insecure_functions.py
Expand Up @@ -41,6 +41,7 @@
CWE information added

""" # noqa: E501
import ast
import sys

import bandit
Expand All @@ -51,6 +52,18 @@
WEAK_HASHES = ("md4", "md5", "sha", "sha1")


def transform(node):
found = False
for keyword in node.keywords:
if keyword.arg == "usedforsecurity":
keyword.value.value = False
found = True
if not found:
keyword = ast.keyword("usedforsecurity", ast.Constant(False))
node.keywords.append(keyword)
return node


def _hashlib_func(context):
if isinstance(context.call_function_name_qual, str):
qualname_list = context.call_function_name_qual.split(".")
Expand All @@ -68,6 +81,7 @@ def _hashlib_func(context):
text=f"Use of weak {func.upper()} hash for security. "
"Consider usedforsecurity=False",
lineno=context.node.lineno,
fix=context.unparse(transform(context.node)),
)
elif func == "new":
args = context.call_args
Expand All @@ -81,6 +95,7 @@ def _hashlib_func(context):
text=f"Use of weak {name.upper()} hash for "
"security. Consider usedforsecurity=False",
lineno=context.node.lineno,
fix=context.unparse(transform(context.node)),
)


Expand All @@ -94,12 +109,25 @@ def _hashlib_new(context):
keywords = context.call_keywords
name = args[0] if args else keywords.get("name", None)
if isinstance(name, str) and name.lower() in WEAK_HASHES:
if len(context.node.args):
if sys.version_info >= (3, 8):
# Call(func=Attribute(value=Name(id='hashlib',
# ctx=Load()), attr='new', ctx=Load()),
# args=[Constant(value='md5', kind=None)], keywords=[])
context.node.args[0].value = "sha224"
elif isinstance(context.node.args[0], ast.Str):
# Call(func=Attribute(value=Name(id='hashlib',
# ctx=Load()), attr='new', ctx=Load()),
# args=[Str(s='md5')], keywords=[])
context.node.args[0] = ast.Str("sha224")

return bandit.Issue(
severity=bandit.MEDIUM,
confidence=bandit.HIGH,
cwe=issue.Cwe.BROKEN_CRYPTO,
text=f"Use of insecure {name.upper()} hash function.",
lineno=context.node.lineno,
fix=context.unparse(context.node),
)


Expand Down
12 changes: 12 additions & 0 deletions bandit/plugins/jinja2_templates.py
Expand Up @@ -85,6 +85,8 @@ def jinja2_autoescape_false(context):
getattr(node.value, "id", None) == "False"
or getattr(node.value, "value", None) is False
):
node.value.value = True

return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.HIGH,
Expand All @@ -94,6 +96,7 @@ def jinja2_autoescape_false(context):
"Use autoescape=True or use the "
"select_autoescape function to mitigate XSS "
"vulnerabilities.",
fix=context.unparse(context.node),
)
# found autoescape
if getattr(node, "arg", None) == "autoescape":
Expand All @@ -112,6 +115,8 @@ def jinja2_autoescape_false(context):
):
return
else:
node.value = ast.Constant(value=True, kind=None)

return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.MEDIUM,
Expand All @@ -121,14 +126,21 @@ def jinja2_autoescape_false(context):
"Ensure autoescape=True or use the "
"select_autoescape function to mitigate "
"XSS vulnerabilities.",
fix=context.unparse(context.node),
)
# We haven't found a keyword named autoescape, indicating default
# behavior
keyword = ast.keyword(
"autoescape", ast.Constant(value=True, kind=None)
)
context.node.keywords.append(keyword)

return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.HIGH,
cwe=issue.Cwe.CODE_INJECTION,
text="By default, jinja2 sets autoescape to False. Consider "
"using autoescape=True or use the select_autoescape "
"function to mitigate XSS vulnerabilities.",
fix=context.unparse(context.node),
)
3 changes: 3 additions & 0 deletions bandit/plugins/ssh_no_host_key_verification.py
Expand Up @@ -51,6 +51,8 @@ def ssh_no_host_key_verification(context):
"AutoAddPolicy",
"WarningPolicy",
]:
context.node.args[0].attr = "RejectPolicy"

return bandit.Issue(
severity=bandit.HIGH,
confidence=bandit.MEDIUM,
Expand All @@ -60,4 +62,5 @@ def ssh_no_host_key_verification(context):
lineno=context.get_lineno_for_call_arg(
"set_missing_host_key_policy"
),
fix=context.unparse(context.node),
)
16 changes: 16 additions & 0 deletions bandit/plugins/yaml_load.py
Expand Up @@ -66,11 +66,27 @@ def yaml_load(context):
not context.get_call_arg_at_position(1) == "CSafeLoader",
]
):
if getattr(context.node.func, "attr", None) == "load":
context.node.func.attr = "safe_load"
for keyword in context.node.keywords:
if keyword.arg == "Loader":
context.node.keywords.remove(keyword)
break
elif getattr(context.node.func, "id", None) == "load":
# Suggesting a switch to safe_load won't work without the import.
# Therefore switch to a SafeLoader.
# TODO: fix this
for keyword in context.node.keywords:
if keyword.arg == "Loader":
context.node.keywords.remove(keyword)
break

return bandit.Issue(
severity=bandit.MEDIUM,
confidence=bandit.HIGH,
cwe=issue.Cwe.IMPROPER_INPUT_VALIDATION,
text="Use of unsafe yaml load. Allows instantiation of"
" arbitrary objects. Consider yaml.safe_load().",
lineno=context.node.lineno,
fix=context.unparse(context.node),
)