diff --git a/Dockerfile b/Dockerfile
index 00edffa1..9fd99812 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -10,7 +10,7 @@ RUN cd /app && python3 -m pip install poetry==1.1.13 pipenv==2022.6.7
# Install this project dependencies
COPY . /app
-RUN cd /app && python3 -m pip install -e .
+RUN cd /app && python3 -m pip install -e .[github]
ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8
diff --git a/action.yml b/action.yml
index 9a6f0158..0c04b960 100644
--- a/action.yml
+++ b/action.yml
@@ -6,6 +6,14 @@ inputs:
description: 'PyUp.io API key'
required: false
default: ''
+ create-pr:
+ description: 'Create a PR if Safety finds any vulnerabilities. Only the `file` scan mode is currently supported - combine with Cron based actions to get automatic notifications of new vulnerabilities.'
+ required: false
+ default: ''
+ create-issue:
+ description: 'Create an issue if Safety finds any vulnerabilities. Only the `file` scan mode is currently supported - combine with Cron based actions to get automatic notifications of new vulnerabilities.'
+ required: false
+ default: ''
scan:
description: 'Scan mode to use. One of auto / docker / env / file (defaults to auto)'
required: false
@@ -30,6 +38,9 @@ inputs:
description: '[Advanced] Any additional arguments to pass to Safety'
required: false
default: ''
+ repo-token:
+ required: false
+ default: ''
outputs:
cli-output:
@@ -43,6 +54,8 @@ runs:
env:
SAFETY_API_KEY: ${{ inputs.api-key }}
SAFETY_ACTION: true
+ SAFETY_ACTION_CREATE_PR: ${{ inputs.create-pr }}
+ SAFETY_ACTION_CREATE_ISSUE: ${{ inputs.create-issue }}
SAFETY_ACTION_SCAN: ${{ inputs.scan }}
SAFETY_ACTION_DOCKER_IMAGE: ${{ inputs.docker-image }}
SAFETY_ACTION_REQUIREMENTS: ${{ inputs.requirements }}
@@ -50,6 +63,7 @@ runs:
SAFETY_ACTION_OUTPUT_FORMAT: ${{ inputs.output-format }}
SAFETY_ACTION_ARGS: ${{ inputs.args }}
SAFETY_ACTION_FORMAT: true
+ GITHUB_TOKEN: ${{ inputs.repo-token }}
COLUMNS: 120
branding:
diff --git a/entrypoint.sh b/entrypoint.sh
index 4ccc38f7..fc5a0bb5 100755
--- a/entrypoint.sh
+++ b/entrypoint.sh
@@ -65,6 +65,43 @@ if [ "${SAFETY_ACTION_SCAN}" = "auto" ]; then
fi
fi
+# remediation mode
+if [ "${SAFETY_ACTION_CREATE_PR}" = "true" ] && [ "${SAFETY_ACTION_CREATE_ISSUE}" = "true" ]; then
+ echo "[Safety Action] Can only create issues or PRs, not both."
+ exit 1
+fi
+
+if [ "${SAFETY_ACTION_CREATE_PR}" = "true" ] || [ "${SAFETY_ACTION_CREATE_ISSUE}" = "true" ]; then
+ if [ "${SAFETY_ACTION_SCAN}" != "file" ]; then
+ echo "[Safety Action] Creating PRs / issues is only supported when scanning a requirements file."
+ exit 1
+ fi
+
+ # TODO: Add info to env vars for telemetry...
+
+ # Build up a list of requirements files, or use SAFETY_ACTION_REQUIREMENTS if that's set.
+ # This will be moved into Safety proper in the future.
+ requirement_files=()
+ if [ -z "${SAFETY_ACTION_REQUIREMENTS}" ]; then
+ readarray -d '' matches < <(find . -type f -name requirements.txt -print0)
+ for match in ${matches[@]}; do
+ requirement_files+=("-r" "${match}")
+ done
+ else
+ requirement_files=("-r" "${SAFETY_ACTION_REQUIREMENTS}")
+ fi
+
+ alert_action="github-pr"
+ if [ "${SAFETY_ACTION_CREATE_ISSUE}" = "true" ]; then
+ alert_action="github-issue"
+ fi
+
+ # Continue on error is set because we're using Safety's output here for further processing.
+ python -m safety check "${requirement_files[@]}" --continue-on-error --output=json ${SAFETY_ACTION_ARGS} | python -m safety alert "${alert_action}" --repo "${GITHUB_REPOSITORY}" --token "${GITHUB_TOKEN}" --base-url "${GITHUB_API_URL}"
+
+ exit 0
+fi
+
if [ "${SAFETY_ACTION_SCAN}" = "docker" ]; then
if [[ "${SAFETY_ACTION_DOCKER_IMAGE}" == "" ]]; then
SAFETY_OS_DESCRIPTION="${SAFETY_OS_DESCRIPTION} docker_image_scan"
diff --git a/safety/alerts/__init__.py b/safety/alerts/__init__.py
new file mode 100644
index 00000000..1ebbd6ca
--- /dev/null
+++ b/safety/alerts/__init__.py
@@ -0,0 +1,42 @@
+import sys
+import json
+from typing import Any
+import click
+
+from dataclasses import dataclass
+
+from . import github
+from safety.util import SafetyPolicyFile
+
+@dataclass
+class Alert:
+ report: Any
+ key: str
+ policy: Any = None
+ requirements_files: Any = None
+
+@click.group(help="Send alerts based on the results of a Safety scan.")
+@click.option('--check-report', help='JSON output of Safety Check to work with.', type=click.File('r'), default=sys.stdin)
+@click.option("--policy-file", type=SafetyPolicyFile(), default='.safety-policy.yml',
+ help="Define the policy file to be used")
+@click.option("--key", envvar="SAFETY_API_KEY",
+ help="API Key for pyup.io's vulnerability database. Can be set as SAFETY_API_KEY "
+ "environment variable.", required=True)
+@click.pass_context
+def alert(ctx, check_report, policy_file, key):
+ with check_report:
+ # TODO: This breaks --help for subcommands
+ try:
+ safety_report = json.load(check_report)
+ except json.decoder.JSONDecodeError as e:
+ click.secho("Error decoding input JSON: {}".format(e.msg), fg='red')
+ sys.exit(1)
+
+ if not 'report_meta' in safety_report:
+ click.secho("You must pass in a valid Safety Check JSON report", fg='red')
+ sys.exit(1)
+
+ ctx.obj = Alert(report=safety_report, policy=policy_file if policy_file else {}, key=key)
+
+alert.add_command(github.github_pr)
+alert.add_command(github.github_issue)
diff --git a/safety/alerts/github.py b/safety/alerts/github.py
new file mode 100644
index 00000000..9ea33006
--- /dev/null
+++ b/safety/alerts/github.py
@@ -0,0 +1,298 @@
+import re
+import sys
+
+import click
+
+try:
+ import github as pygithub
+except ImportError:
+ pygithub = None
+
+from . import utils, requirements
+
+def create_branch(repo, base_branch, new_branch):
+ ref = repo.get_git_ref("heads/" + base_branch)
+ repo.create_git_ref(ref="refs/heads/" + new_branch, sha=ref.object.sha)
+
+def delete_branch(repo, branch):
+ ref = repo.get_git_ref(f"heads/{branch}")
+ ref.delete()
+
+@click.command()
+@click.option('--repo', help='GitHub standard repo path (eg, my-org/my-project)')
+@click.option('--token', help='GitHub Access Token')
+@click.option('--base-url', help='Optional custom Base URL, if you\'re using GitHub enterprise', default=None)
+@click.pass_obj
+@utils.require_files_report
+def github_pr(obj, repo, token, base_url):
+ """
+ Create a GitHub PR to fix any vulnerabilities using PyUp's remediation data.
+
+ Normally, this is run by a GitHub action. If you're running this manually, ensure that your local repo is up to date and on HEAD - otherwise you'll see strange results.
+ """
+ if pygithub is None:
+ click.secho("pygithub is not installed. Did you install Safety with GitHub support? Try pip install safety[github]", fg='red')
+ sys.exit(1)
+
+ # TODO: Improve access to our config in future.
+ branch_prefix = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('branch-prefix', 'pyup/')
+ pr_prefix = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('pr-prefix', '[PyUp] ')
+ assignees = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('assignees', [])
+ labels = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('labels', ['security'])
+ label_severity = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('label-severity', True)
+ ignore_cvss_severity_below = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('ignore-cvss-severity-below', 0)
+ ignore_cvss_unknown_severity = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('ignore-cvss-unknown-severity', False)
+
+ gh = pygithub.Github(token, **({"base_url": base_url} if base_url else {}))
+ repo = gh.get_repo(repo)
+ try:
+ self_user = gh.get_user().login
+ except pygithub.GithubException:
+ # If we're using a token from an action (or integration) we can't call `get_user()`. Fall back
+ # to assuming we're running under an action
+ self_user = "web-flow"
+
+ pulls = repo.get_pulls(state='open', sort='created', base=repo.default_branch)
+ pending_updates = set(obj.report['remediations'].keys())
+
+ # TODO: Refactor this loop into a fn to iterate over remediations nicely
+ for name, contents in obj.requirements_files.items():
+ raw_contents = contents
+ contents = contents.decode('utf-8') # TODO - encoding?
+ parsed_req_file = requirements.RequirementFile(name, contents)
+ for pkg, remediation in obj.report['remediations'].items():
+ if remediation['recommended_version'] is None:
+ print(f"The GitHub PR alerter only currently supports remediations that have a recommended_version: {pkg}")
+ continue
+
+ # We have a single remediation that can have multiple vulnerabilities
+ vulns = [x for x in obj.report['vulnerabilities'] if x['package_name'] == pkg and x['analyzed_version'] == remediation['current_version']]
+
+ if ignore_cvss_unknown_severity and all(x['severity'] is None for x in vulns):
+ print("All vulnerabilities have unknown severity, and ignore_cvss_unknown_severity is set.")
+ continue
+
+ highest_base_score = 0
+ for vuln in vulns:
+ if vuln['severity'] is not None:
+ highest_base_score = max(highest_base_score, (vuln['severity'].get('cvssv3', {}) or {}).get('base_score', 10))
+
+ if ignore_cvss_severity_below:
+ at_least_one_match = False
+ for vuln in vulns:
+ # Consider a None severity as a match, since it's controlled by a different flag
+ # If we can't find a base_score but we have severity data, assume it's critical for now.
+ if vuln['severity'] is None or (vuln['severity'].get('cvssv3', {}) or {}).get('base_score', 10) >= ignore_cvss_severity_below:
+ at_least_one_match = True
+
+ if not at_least_one_match:
+ print(f"None of the vulnerabilities found have a score greater than or equal to the ignore_cvss_severity_below of {ignore_cvss_severity_below}")
+ continue
+
+ for parsed_req in parsed_req_file.requirements:
+ if parsed_req.name == pkg:
+ updated_contents = parsed_req.update_version(contents, remediation['recommended_version'])
+ pending_updates.discard(pkg)
+
+ new_branch = branch_prefix + utils.generate_branch_name(pkg, remediation)
+ skip_create = False
+
+ # Few possible cases:
+ # 1. No existing PRs exist for this change (don't need to handle)
+ # 2. An existing PR exists, and it's out of date (eg, recommended 0.5.1 and we want 0.5.2)
+ # 3. An existing PR exists, and it's not mergable anymore (eg, needs a rebase)
+ # 4. An existing PR exists, and everything's up to date.
+ # 5. An existing PR exists, but it's not needed anymore (perhaps we've been updated to a later version)
+ # 6. No existing PRs exist, but a branch does exist (perhaps the PR was closed but a stale branch left behind)
+ # In any case, we only act if we've been the only committer to the branch.
+ for pr in pulls:
+ if not pr.head.ref.startswith(branch_prefix):
+ continue
+
+ authors = [commit.committer.login for commit in pr.get_commits()]
+ only_us = all([x == self_user for x in authors])
+
+ try:
+ _, pr_pkg, pr_ver = pr.head.ref.split('/')
+ except ValueError:
+ # It's possible that something weird has manually been done, so skip that
+ print('Found an invalid branch name on an open PR, that matches our prefix. Skipping.')
+ continue
+
+ if pr_pkg != pkg:
+ continue
+
+ # Case 4
+ if pr_pkg == pkg and pr_ver == remediation['recommended_version'] and pr.mergeable:
+ print(f"An up to date PR #{pr.number} for {pkg} was found, no action will be taken.")
+
+ skip_create = True
+ continue
+
+ if not only_us:
+ print(f"There are other committers on the PR #{pr.number} for {pkg}. No further action will be taken.")
+ continue
+
+ # Case 2
+ if pr_pkg == pkg and pr_ver != remediation['recommended_version']:
+ print(f"Closing stale PR #{pr.number} for {pkg} as a newer recommended version became")
+
+ pr.create_issue_comment("This PR has been replaced, since a newer recommended version became available.")
+ pr.edit(state='closed')
+ delete_branch(repo, pr.head.ref)
+
+ # Case 3
+ if not pr.mergeable:
+ print(f"Closing PR #{pr.number} for {pkg} as it has become unmergable and we were the only committer")
+
+ pr.create_issue_comment("This PR has been replaced since it became unmergable.")
+ pr.edit(state='closed')
+ delete_branch(repo, pr.head.ref)
+
+ if updated_contents == contents:
+ print(f"Couldn't update {pkg} to {remediation['recommended_version']}")
+ continue
+
+ if skip_create:
+ continue
+
+ try:
+ create_branch(repo, repo.default_branch, new_branch)
+ except pygithub.GithubException as e:
+ if e.data['message'] == "Reference already exists":
+ # There might be a stale branch. If the bot is the only committer, nuke it.
+ comparison = repo.compare(repo.default_branch, new_branch)
+ authors = [commit.committer.login for commit in comparison.commits]
+ only_us = all([x == self_user for x in authors])
+
+ if only_us:
+ delete_branch(repo, new_branch)
+ create_branch(repo, repo.default_branch, new_branch)
+ else:
+ print(f"The branch '{new_branch}' already exists - but there is no matching PR and this branch has committers other than us. This remediation will be skipped.")
+ continue
+ else:
+ raise e
+
+ try:
+ repo.update_file(
+ path=name,
+ message=utils.generate_commit_message(pkg, remediation),
+ content=updated_contents,
+ branch=new_branch,
+ sha=utils.git_sha1(raw_contents)
+ )
+ except pygithub.GithubException as e:
+ if "does not match" in e.data['message']:
+ click.secho(f"GitHub blocked a commit on our branch to the requirements file, {name}, as the local hash we computed didn't match the version on {repo.default_branch}. Make sure you're running safety against the latest code on your default branch.", fg='red')
+ continue
+ else:
+ raise e
+
+ pr = repo.create_pull(title=pr_prefix + utils.generate_title(pkg, remediation, vulns), body=utils.generate_body(pkg, remediation, vulns, api_key=obj.key), head=new_branch, base=repo.default_branch)
+ print(f"Created Pull Request to update {pkg}")
+
+ for assignee in assignees:
+ pr.add_to_assignees(assignee)
+
+ for label in labels:
+ pr.add_to_labels(label)
+
+ if label_severity:
+ score_as_label = utils.cvss3_score_to_label(highest_base_score)
+ if score_as_label:
+ pr.add_to_labels(score_as_label)
+
+ if len(pending_updates) > 0:
+ click.secho("The following remediations were not followed: {}".format(', '.join(pending_updates)), fg='red')
+
+@click.command()
+@click.option('--repo', help='GitHub standard repo path (eg, my-org/my-project)')
+@click.option('--token', help='GitHub Access Token')
+@click.option('--base-url', help='Optional custom Base URL, if you\'re using GitHub enterprise', default=None)
+@click.pass_obj
+@utils.require_files_report # TODO: For now, it can be removed in the future to support env scans.
+def github_issue(obj, repo, token, base_url):
+ """
+ Create a GitHub Issue for any vulnerabilities found using PyUp's remediation data.
+
+ Normally, this is run by a GitHub action. If you're running this manually, ensure that your local repo is up to date and on HEAD - otherwise you'll see strange results.
+ """
+ if pygithub is None:
+ click.secho("pygithub is not installed. Did you install Safety with GitHub support? Try pip install safety[github]", fg='red')
+ sys.exit(1)
+
+ # TODO: Improve access to our config in future.
+ issue_prefix = obj.policy.get('alert', {}).get('security', {}).get('github-issue', {}).get('issue-prefix', '[PyUp] ')
+ assignees = obj.policy.get('alert', {}).get('security', {}).get('github-issue', {}).get('assignees', [])
+ labels = obj.policy.get('alert', {}).get('security', {}).get('github-issue', {}).get('labels', ['security'])
+ label_severity = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('label-severity', True)
+ ignore_cvss_severity_below = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('ignore-cvss-severity-below', 0)
+ ignore_cvss_unknown_severity = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('ignore-cvss-unknown-severity', False)
+
+ gh = pygithub.Github(token, **({"base_url": base_url} if base_url else {}))
+ repo = gh.get_repo(repo)
+
+ issues = list(repo.get_issues(state='open', sort='created'))
+ ISSUE_TITLE_REGEX = re.escape(issue_prefix) + r"Security Vulnerability in (.+)"
+
+ for name, contents in obj.requirements_files.items():
+ raw_contents = contents
+ contents = contents.decode('utf-8') # TODO - encoding?
+ parsed_req_file = requirements.RequirementFile(name, contents)
+ for pkg, remediation in obj.report['remediations'].items():
+ if remediation['recommended_version'] is None:
+ print(f"The GitHub Issue alerter only currently supports remediations that have a recommended_version: {pkg}")
+ continue
+
+ # We have a single remediation that can have multiple vulnerabilities
+ vulns = [x for x in obj.report['vulnerabilities'] if x['package_name'] == pkg and x['analyzed_version'] == remediation['current_version']]
+
+ if ignore_cvss_unknown_severity and all(x['severity'] is None for x in vulns):
+ print("All vulnerabilities have unknown severity, and ignore_cvss_unknown_severity is set.")
+ continue
+
+ highest_base_score = 0
+ for vuln in vulns:
+ if vuln['severity'] is not None:
+ highest_base_score = max(highest_base_score, (vuln['severity'].get('cvssv3', {}) or {}).get('base_score', 10))
+
+ if ignore_cvss_severity_below:
+ at_least_one_match = False
+ for vuln in vulns:
+ # Consider a None severity as a match, since it's controlled by a different flag
+ # If we can't find a base_score but we have severity data, assume it's critical for now.
+ if vuln['severity'] is None or (vuln['severity'].get('cvssv3', {}) or {}).get('base_score', 10) >= ignore_cvss_severity_below:
+ at_least_one_match = True
+
+ if not at_least_one_match:
+ print(f"None of the vulnerabilities found have a score greater than or equal to the ignore_cvss_severity_below of {ignore_cvss_severity_below}")
+ continue
+
+ for parsed_req in parsed_req_file.requirements:
+ if parsed_req.name == pkg:
+ skip = False
+ for issue in issues:
+ match = re.match(ISSUE_TITLE_REGEX, issue.title)
+ if match:
+ if match.group(1) == pkg:
+ skip = True
+
+ # For now, we just skip issues if they already exist - we don't try and update them.
+ if skip:
+ print(f"An issue already exists for {pkg} - skipping")
+ continue
+
+ pr = repo.create_issue(title=issue_prefix + utils.generate_issue_title(pkg, remediation), body=utils.generate_issue_body(pkg, remediation, vulns, api_key=obj.key))
+ print(f"Created issue to update {pkg}")
+
+ for assignee in assignees:
+ pr.add_to_assignees(assignee)
+
+ for label in labels:
+ pr.add_to_labels(label)
+
+ if label_severity:
+ score_as_label = utils.cvss3_score_to_label(highest_base_score)
+ if score_as_label:
+ pr.add_to_labels(score_as_label)
diff --git a/safety/alerts/requirements.py b/safety/alerts/requirements.py
new file mode 100644
index 00000000..400b7050
--- /dev/null
+++ b/safety/alerts/requirements.py
@@ -0,0 +1,339 @@
+from __future__ import unicode_literals
+
+from packaging.version import parse as parse_version
+from packaging.specifiers import SpecifierSet
+import requests
+
+from datetime import datetime
+from dparse import parse, parser, updater, filetypes
+from dparse.dependencies import Dependency
+from dparse.parser import setuptools_parse_requirements_backport as parse_requirements
+
+
+class RequirementFile(object):
+ def __init__(self, path, content, sha=None):
+ self.path = path
+ self.content = content
+ self.sha = sha
+ self._requirements = None
+ self._other_files = None
+ self._is_valid = None
+ self.is_pipfile = False
+ self.is_pipfile_lock = False
+ self.is_setup_cfg = False
+
+ def __str__(self):
+ return "RequirementFile(path='{path}', sha='{sha}', content='{content}')".format(
+ path=self.path,
+ content=self.content[:30] + "[truncated]" if len(self.content) > 30 else self.content,
+ sha=self.sha
+ )
+
+ @property
+ def is_valid(self):
+ if self._is_valid is None:
+ self._parse()
+ return self._is_valid
+
+ @property
+ def requirements(self):
+ if not self._requirements:
+ self._parse()
+ return self._requirements
+
+ @property
+ def other_files(self):
+ if not self._other_files:
+ self._parse()
+ return self._other_files
+
+ @staticmethod
+ def parse_index_server(line):
+ return parser.Parser.parse_index_server(line)
+
+ def _hash_parser(self, line):
+ return parser.Parser.parse_hashes(line)
+
+ def _parse_requirements_txt(self):
+ self.parse_dependencies(filetypes.requirements_txt)
+
+ def _parse_conda_yml(self):
+ self.parse_dependencies(filetypes.conda_yml)
+
+ def _parse_tox_ini(self):
+ self.parse_dependencies(filetypes.tox_ini)
+
+ def _parse_pipfile(self):
+ self.parse_dependencies(filetypes.pipfile)
+ self.is_pipfile = True
+
+ def _parse_pipfile_lock(self):
+ self.parse_dependencies(filetypes.pipfile_lock)
+ self.is_pipfile_lock = True
+
+ def _parse_setup_cfg(self):
+ self.parse_dependencies(filetypes.setup_cfg)
+ self.is_setup_cfg = True
+
+ def _parse(self):
+ self._requirements, self._other_files = [], []
+ if self.path.endswith('.yml') or self.path.endswith(".yaml"):
+ self._parse_conda_yml()
+ elif self.path.endswith('.ini'):
+ self._parse_tox_ini()
+ elif self.path.endswith("Pipfile"):
+ self._parse_pipfile()
+ elif self.path.endswith("Pipfile.lock"):
+ self._parse_pipfile_lock()
+ elif self.path.endswith('setup.cfg'):
+ self._parse_setup_cfg()
+ else:
+ self._parse_requirements_txt()
+ self._is_valid = len(self._requirements) > 0 or len(self._other_files) > 0
+
+ def parse_dependencies(self, file_type):
+ result = parse(
+ self.content,
+ path=self.path,
+ sha=self.sha,
+ file_type=file_type,
+ marker=(
+ ("pyup: ignore file", "pyup:ignore file"), # file marker
+ ("pyup: ignore", "pyup:ignore"), # line marker
+ )
+ )
+ for dep in result.dependencies:
+ req = Requirement(
+ name=dep.name,
+ specs=dep.specs,
+ line=dep.line,
+ lineno=dep.line_numbers[0] if dep.line_numbers else 0,
+ extras=dep.extras,
+ file_type=file_type,
+ )
+ req.index_server = dep.index_server
+ if self.is_pipfile:
+ req.pipfile = self.path
+ req.hashes = dep.hashes
+ self._requirements.append(req)
+ self._other_files = result.resolved_files
+
+ def iter_lines(self, lineno=0):
+ for line in self.content.splitlines()[lineno:]:
+ yield line
+
+ @classmethod
+ def resolve_file(cls, file_path, line):
+ return parser.Parser.resolve_file(file_path, line)
+
+
+class Requirement(object):
+ def __init__(self, name, specs, line, lineno, extras, file_type):
+ self.name = name
+ self.key = name.lower()
+ self.specs = specs
+ self.line = line
+ self.lineno = lineno
+ self.index_server = None
+ self.extras = extras
+ self.hashes = []
+ self.file_type = file_type
+ self.pipfile = None
+
+ self.hashCmp = (
+ self.key,
+ self.specs,
+ frozenset(self.extras),
+ )
+
+ self._is_insecure = None
+ self._changelog = None
+
+ if len(self.specs._specs) == 1 and next(iter(self.specs._specs))._spec[0] == "~=":
+ # convert compatible releases to something more easily consumed,
+ # e.g. '~=1.2.3' is equivalent to '>=1.2.3,<1.3.0', while '~=1.2'
+ # is equivalent to '>=1.2,<2.0'
+ min_version = next(iter(self.specs._specs))._spec[1]
+ max_version = list(parse_version(min_version).release)
+ max_version[-1] = 0
+ max_version[-2] = max_version[-2] + 1
+ max_version = '.'.join(str(x) for x in max_version)
+
+ self.specs = SpecifierSet('>=%s,<%s' % (min_version, max_version))
+
+ def __eq__(self, other):
+ return (
+ isinstance(other, Requirement) and
+ self.hashCmp == other.hashCmp
+ )
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __str__(self):
+ return "Requirement.parse({line}, {lineno})".format(line=self.line, lineno=self.lineno)
+
+ def __repr__(self):
+ return self.__str__()
+
+ @property
+ def is_pinned(self):
+ if len(self.specs._specs) == 1 and next(iter(self.specs._specs))._spec[0] == "==":
+ return True
+ return False
+
+ @property
+ def is_open_ranged(self):
+ if len(self.specs._specs) == 1 and next(iter(self.specs._specs))._spec[0] == ">=":
+ return True
+ return False
+
+ @property
+ def is_ranged(self):
+ return len(self.specs._specs) >= 1 and not self.is_pinned
+
+ @property
+ def is_loose(self):
+ return len(self.specs._specs) == 0
+
+ @staticmethod
+ def convert_semver(version):
+ semver = {'major': 0, "minor": 0, "patch": 0}
+ version = version.split(".")
+ # don't be overly clever here. repitition makes it more readable and works exactly how
+ # it is supposed to
+ try:
+ semver['major'] = int(version[0])
+ semver['minor'] = int(version[1])
+ semver['patch'] = int(version[2])
+ except (IndexError, ValueError):
+ pass
+ return semver
+
+ @property
+ def can_update_semver(self):
+ # return early if there's no update filter set
+ if "pyup: update" not in self.line:
+ return True
+ update = self.line.split("pyup: update")[1].strip().split("#")[0]
+ current_version = Requirement.convert_semver(next(iter(self.specs._specs))._spec[1])
+ next_version = Requirement.convert_semver(self.latest_version)
+ if update == "major":
+ if current_version['major'] < next_version['major']:
+ return True
+ elif update == 'minor':
+ if current_version['major'] < next_version['major'] \
+ or current_version['minor'] < next_version['minor']:
+ return True
+ return False
+
+ @property
+ def filter(self):
+ rqfilter = False
+ if "rq.filter:" in self.line:
+ rqfilter = self.line.split("rq.filter:")[1].strip().split("#")[0]
+ elif "pyup:" in self.line:
+ if "pyup: update" not in self.line:
+ rqfilter = self.line.split("pyup:")[1].strip().split("#")[0]
+ # unset the filter once the date set in 'until' is reached
+ if "until" in rqfilter:
+ rqfilter, until = [l.strip() for l in rqfilter.split("until")]
+ try:
+ until = datetime.strptime(until, "%Y-%m-%d")
+ if until < datetime.now():
+ rqfilter = False
+ except ValueError:
+ # wrong date formatting
+ pass
+ if rqfilter:
+ try:
+ rqfilter, = parse_requirements("filter " + rqfilter)
+ if len(rqfilter.specifier._specs) > 0:
+ return rqfilter.specifier
+ except ValueError:
+ pass
+ return False
+
+ @property
+ def version(self):
+ if self.is_pinned:
+ return next(iter(self.specs._specs))._spec[1]
+
+ specs = self.specs
+ if self.filter:
+ specs = SpecifierSet(
+ ",".join(["".join(s._spec) for s in list(specs._specs) + list(self.filter._specs)])
+ )
+ return self.get_latest_version_within_specs(
+ specs,
+ versions=self.package.versions,
+ prereleases=self.prereleases
+ )
+
+ def get_hashes(self, version):
+ r = requests.get('https://pypi.org/pypi/{name}/{version}/json'.format(
+ name=self.key,
+ version=version
+ ))
+ hashes = []
+ data = r.json()
+
+ for item in data.get("urls", {}):
+ sha256 = item.get("digests", {}).get("sha256", False)
+ if sha256:
+ hashes.append({"hash": sha256, "method": "sha256"})
+ return hashes
+
+ def update_version(self, content, version, update_hashes=True):
+ if self.file_type == filetypes.tox_ini:
+ updater_class = updater.ToxINIUpdater
+ elif self.file_type == filetypes.conda_yml:
+ updater_class = updater.CondaYMLUpdater
+ elif self.file_type == filetypes.requirements_txt:
+ updater_class = updater.RequirementsTXTUpdater
+ elif self.file_type == filetypes.pipfile:
+ updater_class = updater.PipfileUpdater
+ elif self.file_type == filetypes.pipfile_lock:
+ updater_class = updater.PipfileLockUpdater
+ elif self.file_type == filetypes.setup_cfg:
+ updater_class = updater.SetupCFGUpdater
+ else:
+ raise NotImplementedError
+
+ dep = Dependency(
+ name=self.name,
+ specs=self.specs,
+ line=self.line,
+ line_numbers=[self.lineno, ] if self.lineno != 0 else None,
+ dependency_type=self.file_type,
+ hashes=self.hashes,
+ extras=self.extras
+ )
+ hashes = []
+ if self.hashes and update_hashes:
+ hashes = self.get_hashes(version)
+
+ return updater_class.update(
+ content=content,
+ dependency=dep,
+ version=version,
+ hashes=hashes,
+ spec="=="
+ )
+
+ @classmethod
+ def parse(cls, s, lineno, file_type=filetypes.requirements_txt):
+ # setuptools requires a space before the comment. If this isn't the case, add it.
+ if "\t#" in s:
+ parsed, = parse_requirements(s.replace("\t#", "\t #"))
+ else:
+ parsed, = parse_requirements(s)
+
+ return cls(
+ name=parsed.name,
+ specs=parsed.specifier,
+ line=s,
+ lineno=lineno,
+ extras=parsed.extras,
+ file_type=file_type
+ )
diff --git a/safety/alerts/templates/issue.jinja2 b/safety/alerts/templates/issue.jinja2
new file mode 100644
index 00000000..b364e0b3
--- /dev/null
+++ b/safety/alerts/templates/issue.jinja2
@@ -0,0 +1,52 @@
+Safety has detected a vulnerable package, [{{ pkg }}]({{ remediation['more_info_url'] }}), that should be updated from **{{ remediation['current_version'] }}** to **{{ remediation['recommended_version'] }}** to fix {{ vulns | length }} vulnerabilit{{ "y" if vulns|length == 1 else "ies" }}{% if overall_impact %}{{ " rated " + overall_impact if vulns|length == 1 else " with the highest rating being " + overall_impact }}{% endif %}.
+
+To read more about the impact of {{ "this vulnerability" if vulns|length == 1 else "these vulnerabilities" }} see [PyUp’s {{ pkg }} page]({{ remediation['more_info_url'] }}).
+
+If you're using `pip`, you can run:
+
+```
+pip install {{ pkg }}=={{ remediation['recommended_version'] }}
+# Followed by a pip freeze
+```
+
+
+ Vulnerabilities Found
+{% for vuln in vulns %}
+ * {{ vuln.advisory }}
+{% if vuln.severity and vuln.severity.cvssv3 and vuln.severity.cvssv3.base_severity %}
+ * This vulnerability was rated {{ vuln.severity.cvssv3.base_severity }} ({{ vuln.severity.cvssv3.base_score }}) on CVSSv3.
+{% endif %}
+ * To read more about this vulnerability, see PyUp’s [vulnerability page]({{ vuln.more_info_url }})
+{% endfor %}
+
+
+
+ Changelog from {{ remediation['current_version'] }} to {{ remediation['recommended_version'] }}
+{% if summary_changelog %}
+The full changelog is too long to post here. Please see [PyUp’s {{ pkg }} page]({{ remediation['more_info_url'] }}) for more information.
+{% else %}
+{% for version, log in changelog.items() %}
+### {{ version }}
+
+ ```
+ {{ log }}
+ ```
+{% endfor %}
+{% endif %}
+
+
+
+ Ignoring This Vulnerability
+
+If you wish to [ignore this vulnerability](https://docs.pyup.io/docs/safety-20-policy-file), you can add the following to `.safety-policy.yml` in this repo:
+
+```
+security:
+ ignore-vulnerabilities:{% for vuln in vulns %}
+ {{ vuln.vulnerability_id }}:
+ reason: enter a reason as to why you're ignoring this vulnerability
+ expires: 'YYYY-MM-DD' # datetime string - date this ignore will expire
+{% endfor %}
+```
+
+
diff --git a/safety/alerts/templates/pr.jinja2 b/safety/alerts/templates/pr.jinja2
new file mode 100644
index 00000000..4c4d13bc
--- /dev/null
+++ b/safety/alerts/templates/pr.jinja2
@@ -0,0 +1,46 @@
+Vulnerability fix: This PR updates [{{ pkg }}]({{ remediation['more_info_url'] }}) from **{{ remediation['current_version'] }}** to **{{ remediation['recommended_version'] }}** to fix {{ vulns | length }} vulnerabilit{{ "y" if vulns|length == 1 else "ies" }}{% if overall_impact %}{{ " rated " + overall_impact if vulns|length == 1 else " with the highest rating being " + overall_impact }}{% endif %}.
+
+To read more about the impact of {{ "this vulnerability" if vulns|length == 1 else "these vulnerabilities" }} see [PyUp’s {{ pkg }} page]({{ remediation['more_info_url'] }}).
+
+
+
+ Vulnerabilities Fixed
+{% for vuln in vulns %}
+ * {{ vuln.advisory }}
+{% if vuln.severity and vuln.severity.cvssv3 and vuln.severity.cvssv3.base_severity %}
+ * This vulnerability was rated {{ vuln.severity.cvssv3.base_severity }} ({{ vuln.severity.cvssv3.base_score }}) on CVSSv3.
+{% endif %}
+ * To read more about this vulnerability, see PyUp’s [vulnerability page]({{ vuln.more_info_url }})
+{% endfor %}
+
+
+
+ Changelog
+{% if summary_changelog %}
+The full changelog is too long to post here. Please see [PyUp’s {{ pkg }} page]({{ remediation['more_info_url'] }}) for more information.
+{% else %}
+{% for version, log in changelog.items() %}
+### {{ version }}
+
+ ```
+ {{ log }}
+ ```
+{% endfor %}
+{% endif %}
+
+
+
+ Ignoring This Vulnerability
+
+If you wish to [ignore this vulnerability](https://docs.pyup.io/docs/safety-20-policy-file), you can add the following to `.safety-policy.yml` in this repo:
+
+```
+security:
+ ignore-vulnerabilities:{% for vuln in vulns %}
+ {{ vuln.vulnerability_id }}:
+ reason: enter a reason as to why you're ignoring this vulnerability
+ expires: 'YYYY-MM-DD' # datetime string - date this ignore will expire
+{% endfor %}
+```
+
+
diff --git a/safety/alerts/utils.py b/safety/alerts/utils.py
new file mode 100644
index 00000000..91b7e221
--- /dev/null
+++ b/safety/alerts/utils.py
@@ -0,0 +1,132 @@
+import hashlib
+import os
+import sys
+
+from functools import wraps
+from packaging.version import parse as parse_version
+from pathlib import Path
+
+import click
+
+# Jinja2 will only be installed if the optional deps are installed.
+# It's fine if our functions fail, but don't let this top level
+# import error out.
+try:
+ import jinja2
+except ImportError:
+ jinja2 = None
+
+import requests
+
+
+def highest_base_score(vulns):
+ highest_base_score = 0
+ for vuln in vulns:
+ if vuln['severity'] is not None:
+ highest_base_score = max(highest_base_score, (vuln['severity'].get('cvssv3', {}) or {}).get('base_score', 10))
+
+ return highest_base_score
+
+def generate_branch_name(pkg, remediation):
+ return pkg + "/" + remediation['recommended_version']
+
+def generate_issue_title(pkg, remediation):
+ return f"Security Vulnerability in {pkg}"
+
+def generate_title(pkg, remediation, vulns):
+ suffix = "y" if len(vulns) == 1 else "ies"
+ return f"Update {pkg} from {remediation['current_version']} to {remediation['recommended_version']} to fix {len(vulns)} vulnerabilit{suffix}"
+
+def generate_body(pkg, remediation, vulns, *, api_key):
+ changelog = fetch_changelog(pkg, remediation['current_version'], remediation['recommended_version'], api_key=api_key)
+
+ p = Path(__file__).parent / 'templates'
+ env = jinja2.Environment(loader=jinja2.FileSystemLoader(Path(p)))
+ template = env.get_template('pr.jinja2')
+
+ overall_impact = cvss3_score_to_label(highest_base_score(vulns))
+ result = template.render({"pkg": pkg, "remediation": remediation, "vulns": vulns, "changelog": changelog, "overall_impact": overall_impact, "summary_changelog": False })
+
+ # GitHub has a PR body length limit of 65536. If we're going over that, skip the changelog and just use a link.
+ if len(result) > 65500:
+ return template.render({"pkg": pkg, "remediation": remediation, "vulns": vulns, "changelog": changelog, "overall_impact": overall_impact, "summary_changelog": True })
+
+ return result
+
+def generate_issue_body(pkg, remediation, vulns, *, api_key):
+ changelog = fetch_changelog(pkg, remediation['current_version'], remediation['recommended_version'], api_key=api_key)
+
+ p = Path(__file__).parent / 'templates'
+ env = jinja2.Environment(loader=jinja2.FileSystemLoader(Path(p)))
+ template = env.get_template('issue.jinja2')
+
+ overall_impact = cvss3_score_to_label(highest_base_score(vulns))
+ result = template.render({"pkg": pkg, "remediation": remediation, "vulns": vulns, "changelog": changelog, "overall_impact": overall_impact, "summary_changelog": False })
+
+ # GitHub has a PR body length limit of 65536. If we're going over that, skip the changelog and just use a link.
+ if len(result) > 65500:
+ return template.render({"pkg": pkg, "remediation": remediation, "vulns": vulns, "changelog": changelog, "overall_impact": overall_impact, "summary_changelog": True })
+
+def generate_commit_message(pkg, remediation):
+ return f"Update {pkg} from {remediation['current_version']} to {remediation['recommended_version']}"
+
+def git_sha1(raw_contents):
+ return hashlib.sha1(b"blob " + str(len(raw_contents)).encode('ascii') + b"\0" + raw_contents).hexdigest()
+
+def fetch_changelog(package, from_version, to_version, *, api_key):
+ from_version = parse_version(from_version)
+ to_version = parse_version(to_version)
+ changelog = {}
+
+ r = requests.get(
+ "https://pyup.io/api/v1/changelogs/{}/".format(package),
+ headers={"X-Api-Key": api_key}
+ )
+
+ if r.status_code == 200:
+ data = r.json()
+ if data:
+ # sort the changelog by release
+ sorted_log = sorted(data.items(), key=lambda v: parse_version(v[0]), reverse=True)
+
+ # go over each release and add it to the log if it's within the "upgrade
+ # range" e.g. update from 1.2 to 1.3 includes a changelog for 1.2.1 but
+ # not for 0.4.
+ for version, log in sorted_log:
+ parsed_version = parse_version(version)
+ if parsed_version > from_version and parsed_version <= to_version:
+ changelog[version] = log
+
+ return changelog
+
+def cvss3_score_to_label(score):
+ if score >= 0.1 and score <= 3.9:
+ return 'low'
+ elif score >= 4.0 and score <= 6.9:
+ return 'medium'
+ elif score >= 7.0 and score <= 8.9:
+ return 'high'
+ elif score >= 9.0:
+ return 'critical'
+
+ return None
+
+def require_files_report(func):
+ @wraps(func)
+ def inner(obj, *args, **kwargs):
+ if obj.report['report_meta']['scan_target'] != "files":
+ click.secho("This report was generated against an environment, but this alerter requires a file.", fg='red')
+ sys.exit(1)
+
+ files = obj.report['report_meta']['scanned']
+ obj.requirements_files = {}
+ for f in files:
+ if not os.path.exists(f):
+ cwd = os.getcwd()
+ click.secho("A requirements file scanned in the report, {}, does not exist (looking in {}).".format(f, cwd), fg='red')
+ sys.exit(1)
+
+ obj.requirements_files[f] = open(f, "rb").read()
+
+ return func(obj, *args, **kwargs)
+ return inner
diff --git a/safety/cli.py b/safety/cli.py
index 58e73b2a..41130e1c 100644
--- a/safety/cli.py
+++ b/safety/cli.py
@@ -10,6 +10,7 @@
import click
from safety import safety
+from safety.alerts import alert
from safety.constants import EXIT_CODE_VULNERABILITIES_FOUND, EXIT_CODE_OK, EXIT_CODE_FAILURE
from safety.errors import SafetyException, SafetyError
from safety.formatter import SafetyFormatter
@@ -364,6 +365,7 @@ def validate(ctx, name, path):
click.secho(f'The Safety policy file was successfully parsed with the following values:', fg='green')
click.secho(json.dumps(values, indent=4, default=str))
+cli.add_command(alert)
if __name__ == "__main__":
cli()
diff --git a/safety/formatters/json.py b/safety/formatters/json.py
index 46575958..0de11376 100644
--- a/safety/formatters/json.py
+++ b/safety/formatters/json.py
@@ -2,6 +2,8 @@
import json as json_parser
+from requests.models import PreparedRequest
+
from safety.formatter import FormatterAPI
from safety.output_utils import get_report_brief_info
from safety.util import get_basic_announcements
@@ -46,6 +48,13 @@ def render_vulnerabilities(self, announcements, vulnerabilities, remediations, f
other_v != recommended_version]
remed[k]['more_info_url'] = v.get('more_info_url', '')
+ # Use Request's PreparedRequest to handle parsing, joining etc the URL since we're adding query
+ # parameters and don't know what the server might send down.
+ if remed[k]['more_info_url']:
+ req = PreparedRequest()
+ req.prepare_url(remed[k]['more_info_url'], {'from': remed[k]['current_version'], 'to': recommended_version})
+ remed[k]['more_info_url'] = req.url
+
template = {
"report_meta": report,
"scanned_packages": {p.name: p.to_dict(short_version=True) for p in packages},
diff --git a/safety/safety-policy-template.yml b/safety/safety-policy-template.yml
index 43fa60c1..8efb3158 100644
--- a/safety/safety-policy-template.yml
+++ b/safety/safety-policy-template.yml
@@ -12,3 +12,70 @@ security: # configuration for the `safety check` command
reason: we don't use the vulnerable function # optional, for internal note purposes to communicate with your team. This reason will be reported in the Safety reports
expires: '2022-10-21' # datetime string - date this ignore will expire, best practice to use this variable
continue-on-vulnerability-error: False # Suppress non-zero exit codes when vulnerabilities are found. Enable this in pipelines and CI/CD processes if you want to pass builds that have vulnerabilities. We recommend you set this to False.
+alert: # configuration for the `safety alert` command
+ security:
+ # Configuration specific to Safety's GitHub Issue alerting
+ github-issue:
+ # Same as for security - these allow controlling if this alert will fire based
+ # on severity information.
+ # default: not set
+ # ignore-cvss-severity-below: 6
+ # ignore-cvss-unknown-severity: False
+
+ # Add a label to pull requests with the cvss severity, if available
+ # default: true
+ # label-severity: True
+
+ # Add a label to pull requests, default is 'security'
+ # requires private repo permissions, even on public repos
+ # default: security
+ # labels:
+ # - security
+
+ # Assign users to pull requests, default is not set
+ # requires private repo permissions, even on public repos
+ # default: empty
+ # assignees:
+ # - example-user
+
+ # Prefix to give issues when creating them. Note that changing
+ # this might cause duplicate issues to be created.
+ # default: "[PyUp] "
+ # issue-prefix: "[PyUp] "
+
+ # Configuration specific to Safety's GitHub PR alerting
+ github-pr:
+ # Same as for security - these allow controlling if this alert will fire based
+ # on severity information.
+ # default: not set
+ # ignore-cvss-severity-below: 6
+ # ignore-cvss-unknown-severity: False
+
+ # Set the default branch (ie, main, master)
+ # default: empty, the default branch on GitHub
+ branch: ''
+
+ # Add a label to pull requests with the cvss severity, if available
+ # default: true
+ # label-severity: True
+
+ # Add a label to pull requests, default is 'security'
+ # requires private repo permissions, even on public repos
+ # default: security
+ # labels:
+ # - security
+
+ # Assign users to pull requests, default is not set
+ # requires private repo permissions, even on public repos
+ # default: empty
+ # assignees:
+ # - example-user
+
+ # Configure the branch prefix for PRs created by this alert.
+ # NB: Changing this will likely cause duplicate PRs.
+ # default: pyup/
+ branch-prefix: pyup/
+
+ # Set a global prefix for PRs
+ # default: "[PyUp] "
+ pr-prefix: "[PyUp] "
diff --git a/setup.cfg b/setup.cfg
index 35ef688a..db895e9f 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -41,6 +41,7 @@ install_requires =
packaging>=21.0
dparse>=0.6.2
ruamel.yaml>=0.17.21
+ dataclasses==0.8; python_version=="3.6"
[options.entry_points]
console_scripts =
@@ -48,5 +49,10 @@ console_scripts =
python_requires = >=3.6
+[options.extras_require]
+github = pygithub>=1.43.3
+ jinja2 >= 3.1.0
+gitlab = python-gitlab>=1.3.0
+
[flake8]
exclude = docs
diff --git a/test_requirements.txt b/test_requirements.txt
index 60167d4e..8fe00d25 100644
--- a/test_requirements.txt
+++ b/test_requirements.txt
@@ -6,3 +6,4 @@ requests
packaging>=21.0
dparse>=0.6.2
ruamel.yaml>=0.17.21
+dataclasses==0.8; python_version=="3.6"
diff --git a/tests/formatters/test_json.py b/tests/formatters/test_json.py
index 8ecfcdd6..a76a345f 100644
--- a/tests/formatters/test_json.py
+++ b/tests/formatters/test_json.py
@@ -29,7 +29,7 @@ def test_render_vulnerabilities_with_remediations(self, get_report_brief_info):
insecure_versions=['4.0.1'], secure_versions=['4.0.4', '3.2.13', '2.2.28'],
latest_version_without_known_vulnerabilities='',
latest_version='4.0.4',
- more_info_url='https://pyup.io/packages/pypi/django/')
+ more_info_url='https://pyup.io/packages/pypi/django/?from=4.0.1&to=4.0.4')
remediations = {
'django': {'vulns_found': 1, 'version': '4.0.1', 'secure_versions': ['2.2.28', '3.2.13', '4.0.4'],
@@ -103,7 +103,7 @@ def test_render_vulnerabilities_with_remediations(self, get_report_brief_info):
],
"latest_version_without_known_vulnerabilities": "",
"latest_version": "4.0.4",
- "more_info_url": "https://pyup.io/packages/pypi/django/"
+ "more_info_url": "https://pyup.io/packages/pypi/django/?from=4.0.1&to=4.0.4"
}
},
"announcements": [],
@@ -162,7 +162,7 @@ def test_render_vulnerabilities_with_remediations(self, get_report_brief_info):
"2.2.28",
"3.2.13"
],
- "more_info_url": "https://pyup.io/packages/pypi/django/"
+ "more_info_url": "https://pyup.io/packages/pypi/django/?from=4.0.1&to=4.0.4"
}
}
},