Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an 'audit' goal which checks lockfiles against reported vulnerabilities. #20838

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions pants.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ backend_packages.add = [
"pants.backend.build_files.fmt.black",
"pants.backend.python",
"pants.backend.experimental.python.packaging.pyoxidizer",
"pants.backend.experimental.audit",
"pants.backend.python.lint.autoflake",
"pants.backend.python.lint.black",
"pants.backend.python.lint.docformatter",
Expand Down Expand Up @@ -240,3 +241,6 @@ args = ["-Yrangepos", "-Xlint:unused"]

[scala-infer]
force_add_siblings_as_dependencies = false

[pypi-audit]
lockfile_vulnerability_excludes = { "python-default" = ["GHSA-w596-4wvx-j9j6"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Could you add a comment for why this can be ignored? It's not just an example, correct?
  • This is just me being unfamiliar with the space, but how do we end up with a GitHub advisory ID instead of a CVE or python specific one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I think every exception should be made with a note as to why it's being excluded. Perhaps this should be generally encouraged, so we could have a syntax like:

Suggested change
lockfile_vulnerability_excludes = { "python-default" = ["GHSA-w596-4wvx-j9j6"] }
lockfile_vulnerability_excludes = { "python-default" = [{"id": "GHSA-w596-4wvx-j9j6", "note": "This is N/A for reasons a, b and c."}] }

and potentially warn/err if there is no note.

These excludes could then be briefly mentioned, along with the note, when running pants audit for visibility.

7 changes: 7 additions & 0 deletions src/python/pants/backend/audit/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright 2024 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
python_sources()

python_tests(
name="tests",
)
Empty file.
131 changes: 131 additions & 0 deletions src/python/pants/backend/audit/audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright 2024 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
from __future__ import annotations

import logging
from dataclasses import dataclass
from typing import Any, ClassVar, Generic, Iterable, TypeVar

from pants.core.goals.lint import REPORT_DIR as REPORT_DIR # noqa: F401
from pants.core.goals.multi_tool_goal_helper import determine_specified_tool_ids
from pants.engine.collection import Collection
from pants.engine.console import Console
from pants.engine.engine_aware import EngineAwareParameter, EngineAwareReturnType
from pants.engine.goal import Goal, GoalSubsystem
from pants.engine.internals.selectors import Get, MultiGet
from pants.engine.rules import collect_rules, goal_rule
from pants.engine.target import FieldSet, FilteredTargets
from pants.engine.unions import UnionMembership, union

logger = logging.getLogger(__name__)
_FS = TypeVar("_FS", bound=FieldSet)


@dataclass(frozen=True)
class AuditResult:
resolve_name: str
lockfile: str
report: str


@dataclass(unsafe_hash=True)
class AuditResults(EngineAwareReturnType):
"""Zero or more AuditResult objects for a single auditor."""

results: tuple[AuditResult, ...]
auditor_name: str

def __init__(self, results: Iterable[AuditResult], *, auditor_name: str) -> None:
self.results = tuple(results)
self.auditor_name = auditor_name

def cacheable(self) -> bool:
"""Is marked uncacheable to ensure that it always renders."""
return False


@union
@dataclass(unsafe_hash=True)
class AuditRequest(Generic[_FS], EngineAwareParameter):
"""A union for targets that should be audited.

Subclass and install a member of this type to provide an auditor.
"""

field_set_type: ClassVar[type[_FS]] # type: ignore[misc]
tool_id: ClassVar[str]

field_sets: Collection[_FS]

def __init__(self, field_sets: Iterable[_FS]) -> None:
self.field_sets = Collection[_FS](field_sets)

def debug_hint(self) -> str:
return self.tool_id

def metadata(self) -> dict[str, Any]:
return {"addresses": [fs.address.spec for fs in self.field_sets]}


class AuditSubsystem(GoalSubsystem):
name = "audit"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be experimental-audit for now like we did for deploy?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We certainly could. I guess we could look at how big risk for change is there?
Deploy feels like it could be more sensitive to how it works, compared to presenting a list of applicable security reports, which could warrant this to go straight to a "stable" goal name.. for me it's fine either way, but I'm 👍🏽 for having the discussion to settle it.

help = "Run third party dependency audit tools."

@classmethod
def activated(cls, union_membership: UnionMembership) -> bool:
return AuditRequest in union_membership


class Audit(Goal):
subsystem_cls = AuditSubsystem
environment_behavior = Goal.EnvironmentBehavior.LOCAL_ONLY


@goal_rule
async def audit(
console: Console,
targets: FilteredTargets,
union_membership: UnionMembership,
) -> Audit:
request_types = union_membership[AuditRequest]
specified_ids = determine_specified_tool_ids(
"audit",
[
"pypi-audit",
],
request_types,
)
Comment on lines +91 to +97
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this is temporary hack until a proper option is added...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, at present the only audit available is running against the pypi vulnerability database. In future we might want to use osv, or add ways to audit 3rdparty deps for languages other than python.

requests = tuple(
request_type(
request_type.field_set_type.create(target) # type: ignore[misc]
for target in targets
if (
request_type.tool_id in specified_ids
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the tool filter ought to be applied outside of the request_type constructor, to avoid creating empty AuditRequests.`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this is in a comprehension - if the if on line 102 doesn't pass, line 100 will not execute.

Edited to add: I double checked this with:

a = [1,2,3,4]

def foo(x):
    raise Exception(x)

evens = tuple(foo(x) for x in a if x%2 == 0)

This results in Exception: 2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the if on line 102 doesn't pass, line 100 will not execute.

Yes, that is clear. But if the request types tool id is not in the specified_ids, we will not create any field sets, resulting in an empty AuditRequest. Moving the check outside the constructor will eliminate these no-op requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Good catch!

and request_type.field_set_type.is_applicable(target) # type: ignore[misc]
)
)
for request_type in request_types
)
TansyArron marked this conversation as resolved.
Show resolved Hide resolved

all_results = await MultiGet(Get(AuditResults, {request: AuditRequest}) for request in requests)
for results in all_results:
for result in results.results:
if result.report:
sigil = console.sigil_failed()
else:
sigil = console.sigil_succeeded()
console.print_stdout(
f"\n\n{sigil} Resolve: {result.resolve_name} (from {result.lockfile})"
)
if result.report:
console.print_stdout(result.report)
else:
console.print_stdout("No vulnerabilities reported.")

return Audit(0)


def rules():
return [
*collect_rules(),
]
59 changes: 59 additions & 0 deletions src/python/pants/backend/audit/format_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2024 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
"""Functionality for formatting vulnerability results as a set of human-readable columns."""

from __future__ import annotations

from itertools import zip_longest
from typing import Any, Iterable

from pants.backend.audit.pip_audit import VulnerabilityData


def tabulate(rows: Iterable[Iterable[Any]]) -> tuple[list[str], list[int]]:
"""Return a list of formatted rows and a list of column sizes. For example::

>>> tabulate([['foobar', 2000], [0xdeadbeef]])
(['foobar 2000', '3735928559'], [10, 4])
"""
rows = [tuple(map(str, row)) for row in rows]
sizes = [max(map(len, col)) for col in zip_longest(*rows, fillvalue="")]
table = [" ".join(map(str.ljust, row, sizes)).rstrip() for row in rows]
return table, sizes


def generate_header(sizes: Iterable[int]) -> str:
"""Return a dashed header string."""
headers = []
for column_size in sizes:
column_header = "-" * column_size
headers.append(column_header)
return " ".join(headers)


def format_results(
result: dict[str, list[VulnerabilityData]],
) -> str:
"""Returns a column formatted string for a given mapping of dependencies to vulnerability
results."""
vuln_data: list[list[Any]] = []
header = ["Dependency", "ID", "Fix Versions", "Link"]
vuln_data.append(header)
for dep, vulns in result.items():
for vuln in vulns:
vuln_data.append([dep, vuln.vuln_id, vuln.fixed_in, vuln.link])
columns_string = ""

# If it's just a header, don't bother adding it to the output
if len(vuln_data) > 1:
vuln_strings, sizes = tabulate(vuln_data)

if len(vuln_data) > 0:
vuln_strings.insert(1, generate_header(sizes))

for row in vuln_strings:
if columns_string:
columns_string += "\n"
columns_string += row

return columns_string
77 changes: 77 additions & 0 deletions src/python/pants/backend/audit/pip_audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright 2024 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).
import logging
from dataclasses import dataclass
from typing import Dict, List, Optional

import requests
from packaging.requirements import Requirement

logger = logging.getLogger(__name__)


@dataclass
class VulnerabilityData:
"""Represents a vulnerability in some python package."""

vuln_id: str # A service-provided identifier for the vulnerability.
details: str # A human-readable description of the vulnerability. Can be extremely long.
fixed_in: List[
str
] # A list of versions that can be upgraded to that resolve the vulnerability.
aliases: List[str] # A set of aliases (alternative identifiers) for this result.
link: str # A link to the vulnerability info.
summary: Optional[str] # An optional short form human readable description.
withdrawn: Optional[str] # Represents whether the vulnerability has been withdrawn.

@classmethod
def from_raw_data(self, data):
return VulnerabilityData(
vuln_id=data["id"],
details=data["details"],
fixed_in=data["fixed_in"],
aliases=data["aliases"],
link=data["link"],
summary=data["summary"],
withdrawn=data["withdrawn"],
)


def audit_constraints_strings(
constraints_strings, session, excludes_ids
) -> Dict[str, List[VulnerabilityData]]:
"""Retrieve security warnings for the given constraints from the Pypi json API."""
vulnerabilities = {}
for constraint_string in constraints_strings:
requirement = Requirement(constraint_string)
specifiers = list(requirement.specifier)
if len(specifiers) != 1:
raise ValueError(
"Unexpected specifier from a lockfile (not exactly one): {}", specifiers
)
specifier = specifiers[0]
results = audit_constraints_string(
package_name=requirement.name,
version=specifier.version,
session=session,
)
if not results:
continue
vulnerabilities[str(requirement)] = [
result for result in results if result.vuln_id not in excludes_ids
]
return vulnerabilities


def audit_constraints_string(
package_name: str, version: str, session: requests.Session
) -> List[VulnerabilityData]:
url = f"https://pypi.org/pypi/{package_name}/{str(version)}/json"
response = session.get(url=url)
response.raise_for_status()
response_json = response.json()
vulnerabilities = response_json.get("vulnerabilities")
if vulnerabilities:
vulns = [VulnerabilityData.from_raw_data(vuln_data) for vuln_data in vulnerabilities]
return vulns
return []