Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable individual tests #597

Merged
merged 3 commits into from Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 55 additions & 15 deletions bandit/core/manager.py
Expand Up @@ -7,6 +7,7 @@
import json
import logging
import os
import re
import sys
import tokenize
import traceback
Expand All @@ -21,6 +22,8 @@


LOG = logging.getLogger(__name__)
NOSEC_COMMENT = re.compile(r"#\s*nosec:?\s*(?P<tests>[^#]+)?#?")
NOSEC_COMMENT_TESTS = re.compile(r"(?:(B\d+|[a-z_]+),?)+", re.IGNORECASE)


class BanditManager:
Expand Down Expand Up @@ -308,21 +311,20 @@ def _parse_file(self, fname, fdata, new_files_list):
lines = data.splitlines()
self.metrics.begin(fname)
self.metrics.count_locs(lines)
if self.ignore_nosec:
nosec_lines = set()
else:
try:
fdata.seek(0)
tokens = tokenize.tokenize(fdata.readline)
nosec_lines = {
lineno
for toktype, tokval, (lineno, _), _, _ in tokens
if toktype == tokenize.COMMENT
and "#nosec" in tokval
or "# nosec" in tokval
}
except tokenize.TokenError:
nosec_lines = set()
# nosec_lines is a dict of line number -> set of tests to ignore
# for the line
nosec_lines = dict()
try:
fdata.seek(0)
tokens = tokenize.tokenize(fdata.readline)

if not self.ignore_nosec:
for toktype, tokval, (lineno, _), _, _ in tokens:
if toktype == tokenize.COMMENT:
nosec_lines[lineno] = _parse_nosec_comment(tokval)

except tokenize.TokenError:
pass
score = self._execute_ast_visitor(fname, data, nosec_lines)
self.scores.append(score)
self.metrics.count_issues(
Expand Down Expand Up @@ -460,3 +462,41 @@ def _find_candidate_matches(unmatched_issues, results_list):
]

return issue_candidates


def _find_test_id_from_nosec_string(extman, match):
plugin_id = extman.check_id(match)
if plugin_id:
return match
# Finding by short_id didn't work, let's check the plugin name
plugin_id = extman.get_plugin_id(match)
if not plugin_id:
# Name and short id didn't work:
LOG.warning(
"Test in comment: %s is not a test name or id, ignoring", match
)
return plugin_id # We want to return None or the string here regardless


def _parse_nosec_comment(comment):
found_no_sec_comment = NOSEC_COMMENT.search(comment)
if not found_no_sec_comment:
# there was no nosec comment
return None

matches = found_no_sec_comment.groupdict()
nosec_tests = matches.get("tests", set())

# empty set indicates that there was a nosec comment without specific
# test ids or names
test_ids = set()
if nosec_tests:
extman = extension_loader.MANAGER
# lookup tests by short code or name
for test in NOSEC_COMMENT_TESTS.finditer(nosec_tests):
test_match = test.group(1)
test_id = _find_test_id_from_nosec_string(extman, test_match)
if test_id:
test_ids.add(test_id)

return test_ids
24 changes: 19 additions & 5 deletions bandit/core/metrics.py
Expand Up @@ -19,7 +19,11 @@ class Metrics:

def __init__(self):
self.data = dict()
self.data["_totals"] = {"loc": 0, "nosec": 0}
self.data["_totals"] = {
"loc": 0,
"nosec": 0,
"skipped_tests": 0,
}

# initialize 0 totals for criteria and rank; this will be reset later
for rank in constants.RANKING:
Expand All @@ -30,21 +34,31 @@ def begin(self, fname):
"""Begin a new metric block.

This starts a new metric collection name "fname" and makes is active.

:param fname: the metrics unique name, normally the file name.
"""
self.data[fname] = {"loc": 0, "nosec": 0}
self.data[fname] = {
"loc": 0,
"nosec": 0,
"skipped_tests": 0,
}
self.current = self.data[fname]

def note_nosec(self, num=1):
"""Note a "nosec" commnet.
"""Note a "nosec" comment.

Increment the currently active metrics nosec count.

:param num: number of nosecs seen, defaults to 1
"""
self.current["nosec"] += num

def note_skipped_test(self, num=1):
"""Note a "nosec BXXX, BYYY, ..." comment.

Increment the currently active metrics skipped_tests count.
:param num: number of skipped_tests seen, defaults to 1
"""
self.current["skipped_tests"] += num

def count_locs(self, lines):
"""Count lines of code.

Expand Down
9 changes: 6 additions & 3 deletions bandit/core/node_visitor.py
Expand Up @@ -30,7 +30,7 @@ def __init__(self, fname, metaast, testset, debug, nosec_lines, metrics):
self.imports = set()
self.import_aliases = {}
self.tester = b_tester.BanditTester(
self.testset, self.debug, nosec_lines
self.testset, self.debug, nosec_lines, metrics
)

# in some cases we can't determine a qualified name
Expand Down Expand Up @@ -201,10 +201,13 @@ def pre_visit(self, node):
if hasattr(node, "lineno"):
self.context["lineno"] = node.lineno

if node.lineno in self.nosec_lines:
LOG.debug("skipped, nosec")
# explicitly check for empty set to skip all tests for a line
nosec_tests = self.nosec_lines.get(node.lineno)
if nosec_tests is not None and not len(nosec_tests):
LOG.debug("skipped, nosec without test number")
self.metrics.note_nosec()
return False

if hasattr(node, "col_offset"):
self.context["col_offset"] = node.col_offset

Expand Down
72 changes: 63 additions & 9 deletions bandit/core/tester.py
Expand Up @@ -15,12 +15,13 @@


class BanditTester:
def __init__(self, testset, debug, nosec_lines):
def __init__(self, testset, debug, nosec_lines, metrics):
self.results = []
self.testset = testset
self.last_result = None
self.debug = debug
self.nosec_lines = nosec_lines
self.metrics = metrics

def run_tests(self, raw_context, checktype):
"""Runs all tests for a certain type of check, for example
Expand All @@ -30,8 +31,8 @@ def run_tests(self, raw_context, checktype):

:param raw_context: Raw context dictionary
:param checktype: The type of checks to run
:param nosec_lines: Lines which should be skipped because of nosec
:return: a score based on the number and type of test results
:return: a score based on the number and type of test results with
extra metrics about nosec comments
"""

scores = {
Expand All @@ -51,12 +52,10 @@ def run_tests(self, raw_context, checktype):
else:
result = test(context)

# if we have a result, record it and update scores
if (
result is not None
and result.lineno not in self.nosec_lines
and temp_context["lineno"] not in self.nosec_lines
):
if result is not None:
nosec_tests_to_skip = self._get_nosecs_from_contexts(
temp_context, test_result=result
)

if isinstance(temp_context["filename"], bytes):
result.fname = temp_context["filename"].decode("utf-8")
Expand All @@ -71,6 +70,20 @@ def run_tests(self, raw_context, checktype):
if result.test_id == "":
result.test_id = test._test_id

# don't skip the test if there was no nosec comment
if nosec_tests_to_skip is not None:
# if the set is empty or the test id is in the set of
# tests to skip, log and increment the skip by test
# count
if not nosec_tests_to_skip or (
result.test_id in nosec_tests_to_skip
):
LOG.debug(
"skipped, nosec for test %s" % result.test_id
)
self.metrics.note_skipped_test()
continue

self.results.append(result)

LOG.debug("Issue identified by %s: %s", name, result)
Expand All @@ -80,6 +93,18 @@ def run_tests(self, raw_context, checktype):
con = constants.RANKING.index(result.confidence)
val = constants.RANKING_VALUES[result.confidence]
scores["CONFIDENCE"][con] += val
else:
nosec_tests_to_skip = self._get_nosecs_from_contexts(
temp_context
)
if (
nosec_tests_to_skip
and test._test_id in nosec_tests_to_skip
):
LOG.warning(
f"nosec encountered ({test._test_id}), but no "
f"failed test on line {temp_context['lineno']}"
)

except Exception as e:
self.report_error(name, context, e)
Expand All @@ -88,6 +113,35 @@ def run_tests(self, raw_context, checktype):
LOG.debug("Returning scores: %s", scores)
return scores

def _get_nosecs_from_contexts(self, context, test_result=None):
"""Use context and optional test result to get set of tests to skip.
:param context: temp context
:param test_result: optional test result
:return: set of tests to skip for the line based on contexts
"""
nosec_tests_to_skip = set()
base_tests = (
self.nosec_lines.get(test_result.lineno, None)
if test_result
else None
)
context_tests = self.nosec_lines.get(context["lineno"], None)

# if both are none there were no comments
# this is explicitly different from being empty.
# empty set indicates blanket nosec comment without
# individual test names or ids
if base_tests is None and context_tests is None:
nosec_tests_to_skip = None

# combine tests from current line and context line
if base_tests is not None:
nosec_tests_to_skip.update(base_tests)
if context_tests is not None:
nosec_tests_to_skip.update(context_tests)

return nosec_tests_to_skip

@staticmethod
def report_error(test, context, error):
what = "Bandit internal error running: "
Expand Down
5 changes: 5 additions & 0 deletions bandit/formatters/text.py
Expand Up @@ -171,6 +171,11 @@ def report(manager, fileobj, sev_level, conf_level, lines=-1):
"\tTotal lines skipped (#nosec): %i"
% (manager.metrics.data["_totals"]["nosec"])
)
bits.append(
"\tTotal potential issues skipped due to specifically being "
"disabled (e.g., #nosec BXXX): %i"
% (manager.metrics.data["_totals"]["skipped_tests"])
)

skipped = manager.get_skipped()
bits.append(get_metrics(manager))
Expand Down
17 changes: 17 additions & 0 deletions doc/source/config.rst
Expand Up @@ -41,6 +41,23 @@ security issue, it will not be reported::

self.process = subprocess.Popen('/bin/echo', shell=True) # nosec

Because multiple issues can be reported for the same line, specific tests may
be provided to suppress those reports. This will cause other issues not
included to be reported. This can be useful in preventing situations where a
nosec comment is used, but a separate vulnerability may be added to the line
later causing the new vulnerability to be ignored.

For example, this will suppress the report of B602 and B607::

self.process = subprocess.Popen('/bin/ls *', shell=True) #nosec B602, B607

Full test names rather than the test ID may also be used.

For example, this will suppress the report of B101 and continue to report B506
as an issue.::

assert yaml.load("{}") == [] # nosec assert_used

-----------------
Scanning Behavior
-----------------
Expand Down
10 changes: 9 additions & 1 deletion examples/nosec.py
Expand Up @@ -4,4 +4,12 @@
subprocess.Popen('/bin/ls *',
shell=True) #nosec (on the specific kwarg line)
subprocess.Popen('#nosec', shell=True)
subprocess.Popen('/bin/ls *', shell=True) # type: … # nosec # noqa: E501 ; pylint: disable=line-too-long
subprocess.Popen('/bin/ls *', shell=True) # type: ... # nosec # noqa: E501 ; pylint: disable=line-too-long
subprocess.Popen('/bin/ls *', shell=True) # type: ... # nosec B607 # noqa: E501 ; pylint: disable=line-too-long
subprocess.Popen('/bin/ls *', shell=True) #nosec subprocess_popen_with_shell_equals_true (on the line)
subprocess.Popen('#nosec', shell=True) # nosec B607, B602
subprocess.Popen('#nosec', shell=True) # nosec B607 B602
subprocess.Popen('/bin/ls *', shell=True) # nosec subprocess_popen_with_shell_equals_true start_process_with_partial_path
subprocess.Popen('/bin/ls *', shell=True) # type: ... # noqa: E501 ; pylint: disable=line-too-long # nosec
subprocess.Popen('#nosec', shell=True) # nosec B607, B101
subprocess.Popen('#nosec', shell=True) # nosec B602, subprocess_popen_with_shell_equals_true
4 changes: 2 additions & 2 deletions tests/functional/test_functional.py
Expand Up @@ -747,8 +747,8 @@ def test_flask_debug_true(self):

def test_nosec(self):
expect = {
"SEVERITY": {"UNDEFINED": 0, "LOW": 2, "MEDIUM": 0, "HIGH": 0},
"CONFIDENCE": {"UNDEFINED": 0, "LOW": 0, "MEDIUM": 0, "HIGH": 2},
"SEVERITY": {"UNDEFINED": 0, "LOW": 5, "MEDIUM": 0, "HIGH": 0},
"CONFIDENCE": {"UNDEFINED": 0, "LOW": 0, "MEDIUM": 0, "HIGH": 5},
}
self.check_example("nosec.py", expect)

Expand Down
8 changes: 7 additions & 1 deletion tests/unit/formatters/test_text.py
Expand Up @@ -107,7 +107,11 @@ def test_report_nobaseline(self, get_issue_list):

get_issue_list.return_value = [issue_a, issue_b]

self.manager.metrics.data["_totals"] = {"loc": 1000, "nosec": 50}
self.manager.metrics.data["_totals"] = {
"loc": 1000,
"nosec": 50,
"skipped_tests": 0,
}
for category in ["SEVERITY", "CONFIDENCE"]:
for level in ["UNDEFINED", "LOW", "MEDIUM", "HIGH"]:
self.manager.metrics.data["_totals"][f"{category}.{level}"] = 1
Expand Down Expand Up @@ -153,6 +157,8 @@ def test_report_nobaseline(self, get_issue_list):
"High: 1",
"Total lines skipped ",
"(#nosec): 50",
"Total potential issues skipped due to specifically being ",
"disabled (e.g., #nosec BXXX): 0",
"Total issues (by severity)",
"Total issues (by confidence)",
"Files skipped (1)",
Expand Down