Skip to content

Commit

Permalink
Add curl advisories
Browse files Browse the repository at this point in the history
- added curl importer
- added tests for curl importer
Signed-off-by: ambuj <kulshreshthaak.12@gmail.com>
  • Loading branch information
ambuj-1211 committed May 10, 2024
1 parent 766cb3e commit 24b5eaa
Show file tree
Hide file tree
Showing 11 changed files with 557 additions and 0 deletions.
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from vulnerabilities.importers import apache_kafka
from vulnerabilities.importers import apache_tomcat
from vulnerabilities.importers import archlinux
from vulnerabilities.importers import curl
from vulnerabilities.importers import debian
from vulnerabilities.importers import debian_oval
from vulnerabilities.importers import elixir_security
Expand Down Expand Up @@ -71,6 +72,7 @@
oss_fuzz.OSSFuzzImporter,
ruby.RubyImporter,
github_osv.GithubOSVImporter,
curl.CurlImporter,
]

IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}
170 changes: 170 additions & 0 deletions vulnerabilities/importers/curl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import logging
from datetime import datetime
from datetime import timezone
from typing import Iterable
from typing import Mapping

import requests
from cwe2.database import Database
from packageurl import PackageURL
from univers.version_range import GenericVersionRange
from univers.versions import SemverVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.severity_systems import SCORING_SYSTEMS
from vulnerabilities.utils import fetch_response
from vulnerabilities.utils import get_cwe_id
from vulnerabilities.utils import get_item

logger = logging.getLogger(__name__)


class CurlImporter(Importer):

spdx_license_expression = "curl"
license_url = "https://curl.se/docs/copyright.html"
repo_url = "https://github.com/curl/curl-www/"
importer_name = "Curl Importer"
api_url = "https://curl.se/docs/vuln.json"

def fetch(self) -> Iterable[Mapping]:
response = fetch_response(self.api_url)
return response.json()

def advisory_data(self) -> Iterable[AdvisoryData]:
raw_data = self.fetch()
for data in raw_data:
cve_id = data.get("aliases") or []
cve_id = cve_id[0] if len(cve_id) > 0 else None
if not cve_id.startswith("CVE"):
package = data.get("database_specific").get("package")
logger.error(f"Invalid CVE ID: {cve_id} in package {package}")
continue
yield parse_advisory_data(data)


def parse_advisory_data(raw_data) -> AdvisoryData:
"""
Parse advisory data from raw JSON data and return an AdvisoryData object.
Args:
raw_data (dict): Raw JSON data containing advisory information.
Returns:
AdvisoryData: Parsed advisory data as an AdvisoryData object.
Example:
>>> raw_data = {
... "aliases": ["CVE-2024-2379"],
... "summary": "QUIC certificate check bypass with wolfSSL",
... "database_specific": {
... "package": "curl",
... "URL": "https://curl.se/docs/CVE-2024-2379.json",
... "www": "https://curl.se/docs/CVE-2024-2379.html",
... "issue": "https://hackerone.com/reports/2410774",
... "severity": "Low",
... "CWE": {
... "id": "CWE-297",
... "desc": "Improper Validation of Certificate with Host Mismatch"
... },
... },
... "published": "2024-03-27T08:00:00.00Z",
... "affected": [
... {
... "ranges": [
... {
... "type": "SEMVER",
... "events": [
... {"introduced": "8.6.0"},
... {"fixed": "8.7.0"}
... ]
... }
... ],
... "versions": ["8.6.0"]
... }
... ]
... }
>>> parse_advisory_data(raw_data)
AdvisoryData(aliases=['CVE-2024-2379'], summary='QUIC certificate check bypass with wolfSSL', affected_packages=[AffectedPackage(package=PackageURL(type='generic', namespace='curl.se', name='curl', version=None, qualifiers={}, subpath=None), affected_version_range=GenericVersionRange(constraints=(VersionConstraint(comparator='=', version=SemverVersion(string='8.6.0')),)), fixed_version=SemverVersion(string='8.7.0'))], references=[Reference(reference_id='', url='https://curl.se/docs/CVE-2024-2379.html', severities=[VulnerabilitySeverity(system=ScoringSystem(identifier='generic_textual', name='Generic textual severity rating', url='', notes='Severity for generic scoring systems. Contains generic textual values like High, Low etc'), value='Low', scoring_elements='')]), Reference(reference_id='', url='https://hackerone.com/reports/2410774', severities=[])], date_published=datetime.datetime(2024, 3, 27, 8, 0, tzinfo=datetime.timezone.utc), weaknesses=[297], url='https://curl.se/docs/CVE-2024-2379.json')
"""

affected = get_item(raw_data, "affected")[0] if len(get_item(raw_data, "affected")) > 0 else []

ranges = get_item(affected, "ranges")[0] if len(get_item(affected, "ranges")) > 0 else []
events = get_item(ranges, "events")[1] if len(get_item(ranges, "events")) > 1 else {}
version_type = get_item(ranges, "type") if get_item(ranges, "type") else ""
fixed_version = events.get("fixed")
if version_type == "SEMVER" and fixed_version:
fixed_version = SemverVersion(fixed_version)

purl = PackageURL(type="generic", namespace="curl.se", name="curl")
versions = affected.get("versions") or []
affected_version_range = GenericVersionRange.from_versions(versions)

affected_package = AffectedPackage(
package=purl, affected_version_range=affected_version_range, fixed_version=fixed_version
)

database_specific = raw_data.get("database_specific") or {}
severity = VulnerabilitySeverity(
system=SCORING_SYSTEMS["generic_textual"], value=database_specific.get("severity", "")
)

references = []
ref_www = database_specific.get("www") or ""
ref_issue = database_specific.get("issue") or ""
if ref_www:
references.append(Reference(url=ref_www, severities=[severity]))
if ref_issue:
references.append(Reference(url=ref_issue))

date_published = datetime.strptime(
raw_data.get("published") or "", "%Y-%m-%dT%H:%M:%S.%fZ"
).replace(tzinfo=timezone.utc)
weaknesses = get_cwe_from_curl_advisory(raw_data)

return AdvisoryData(
aliases=raw_data.get("aliases") or [],
summary=raw_data.get("summary") or "",
affected_packages=[affected_package],
references=references,
date_published=date_published,
weaknesses=weaknesses,
url=raw_data.get("database_specific", {}).get("URL", ""),
)


def get_cwe_from_curl_advisory(raw_data):
"""
Extracts CWE IDs from the given raw_data and returns a list of CWE IDs.
>>> get_cwe_from_curl_advisory({"database_specific": {"CWE": {"id": "CWE-333"}}})
[333]
>>> get_cwe_from_curl_advisory({"database_specific": {"CWE": {"id": ""}}})
[]
"""
weaknesses = []
db = Database()
cwe_string = get_item(raw_data, "database_specific", "CWE", "id") or ""

if cwe_string:
cwe_id = get_cwe_id(cwe_string)
try:
db.get(cwe_id)
weaknesses.append(cwe_id)
except Exception:
logger.error("Invalid CWE id")
return weaknesses
1 change: 1 addition & 0 deletions vulnerabilities/improvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
valid_versions.RubyImprover,
valid_versions.GithubOSVImprover,
vulnerability_status.VulnerabilityStatusImprover,
valid_versions.CurlImprover,
]

IMPROVERS_REGISTRY = {x.qualified_name: x for x in IMPROVERS_REGISTRY}
6 changes: 6 additions & 0 deletions vulnerabilities/improvers/valid_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from vulnerabilities.importers.apache_httpd import ApacheHTTPDImporter
from vulnerabilities.importers.apache_kafka import ApacheKafkaImporter
from vulnerabilities.importers.apache_tomcat import ApacheTomcatImporter
from vulnerabilities.importers.curl import CurlImporter
from vulnerabilities.importers.debian import DebianImporter
from vulnerabilities.importers.debian_oval import DebianOvalImporter
from vulnerabilities.importers.elixir_security import ElixirSecurityImporter
Expand Down Expand Up @@ -472,3 +473,8 @@ class RubyImprover(ValidVersionImprover):
class GithubOSVImprover(ValidVersionImprover):
importer = GithubOSVImporter
ignorable_versions = []


class CurlImprover(ValidVersionImprover):
importer = CurlImporter
ignorable_versions = []
73 changes: 73 additions & 0 deletions vulnerabilities/tests/test_curl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
from unittest import TestCase
from unittest.mock import patch

from vulnerabilities.importers.curl import get_cwe_from_curl_advisory
from vulnerabilities.importers.curl import parse_advisory_data
from vulnerabilities.tests import util_tests
from vulnerabilities.utils import load_json

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA = os.path.join(BASE_DIR, "test_data/curl")


class TestCurlImporter(TestCase):
def test_parse_advisory_data1(self):
mock_response = load_json(os.path.join(TEST_DATA, "curl_advisory_mock1.json"))
expected_file = os.path.join(TEST_DATA, "expected_curl_advisory_output1.json")
result = parse_advisory_data(mock_response)
result = result.to_dict()
util_tests.check_results_against_json(result, expected_file)

def test_parse_advisory_data2(self):
mock_response = load_json(os.path.join(TEST_DATA, "curl_advisory_mock2.json"))
expected_file = os.path.join(TEST_DATA, "expected_curl_advisory_output2.json")
result = parse_advisory_data(mock_response)
result = result.to_dict()
util_tests.check_results_against_json(result, expected_file)

def test_parse_advisory_data3(self):
mock_response = load_json(os.path.join(TEST_DATA, "curl_advisory_mock3.json"))
expected_file = os.path.join(TEST_DATA, "expected_curl_advisory_output3.json")
result = parse_advisory_data(mock_response)
result = result.to_dict()
util_tests.check_results_against_json(result, expected_file)

def test_get_cwe_from_curl_advisory(self):
assert get_cwe_from_curl_advisory(
{
"id": "CURL-CVE-2024-2466",
"database_specific": {
"CWE": {
"id": "CWE-297",
"desc": "Improper Validation of Certificate with Host Mismatch",
},
},
}
) == [297]

mock_advisory = [
{
"id": "CURL-CVE-XXXX-XXXX",
"database_specific": {"CWE": {"id": "CWE-111111111", "desc": "Invalid weaknesses"}},
},
{
"id": "CURL-CVE-2024-2466",
"database_specific": {
"CWE": {"id": "CWE-311", "desc": "Missing Encryption of Sensitive Data"},
},
},
]
mock_cwe_list = []
for advisory in mock_advisory:
mock_cwe_list.extend(get_cwe_from_curl_advisory(advisory))
assert mock_cwe_list == [311]
61 changes: 61 additions & 0 deletions vulnerabilities/tests/test_data/curl/curl_advisory_mock1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{
"schema_version": "1.5.0",
"id": "CURL-CVE-2024-2379",
"aliases": [
"CVE-2024-2379"
],
"summary": "QUIC certificate check bypass with wolfSSL",
"modified": "2024-03-26T10:36:00.00Z",
"database_specific": {
"package": "curl",
"URL": "https://curl.se/docs/CVE-2024-2379.json",
"www": "https://curl.se/docs/CVE-2024-2379.html",
"issue": "https://hackerone.com/reports/2410774",
"CWE": {
"id": "CWE-295",
"desc": "Improper Certificate Validation"
},
"award": {
"amount": "540",
"currency": "USD"
},
"last_affected": "8.6.0",
"severity": "Low"
},
"published": "2024-03-27T08:00:00.00Z",
"affected": [
{
"ranges": [
{
"type": "SEMVER",
"events": [
{"introduced": "8.6.0"},
{"fixed": "8.7.0"}
]
},
{
"type": "GIT",
"repo": "https://github.com/curl/curl.git",
"events": [
{"introduced": "5d044ad9480a9f556f4b6a252d7533b1ba7fe57e"},
{"fixed": "aedbbdf18e689a5eee8dc39600914f5eda6c409c"}
]
}
],
"versions": [
"8.6.0"
]
}
],
"credits": [
{
"name": "Dexter Gerig",
"type": "FINDER"
},
{
"name": "Daniel Stenberg",
"type": "REMEDIATION_DEVELOPER"
}
],
"details": "libcurl skips the certificate verification for a QUIC connection under certain\nconditions, when built to use wolfSSL. If told to use an unknown/bad cipher or\ncurve, the error path accidentally skips the verification and returns OK, thus\nignoring any certificate problems."
}

0 comments on commit 24b5eaa

Please sign in to comment.