From e8b2ae73b8d54c7e3f61831715f9a4f49619937f Mon Sep 17 00:00:00 2001 From: pgjones Date: Tue, 21 Sep 2021 18:22:54 +0100 Subject: [PATCH] Fix the CSP header The header_property does not set the on_update method in the CSP datastructure which means any changes wouldn't be set in the headers. This fixes the issue by specifying the properties directly and including tests to ensure it works. --- CHANGES.rst | 1 + src/werkzeug/sansio/response.py | 82 ++++++++++++++++++++++++++------- tests/test_wrappers.py | 14 ++++++ 3 files changed, 81 insertions(+), 16 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 3ac9ccea21..74b93a037d 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -43,6 +43,7 @@ Unreleased - Ad-hoc TLS certs are generated with SAN matching CN. :issue:`2158` - Fix memory usage for locals when using Python 3.6 or pre 0.4.17 greenlet versions. :pr:`2212` +- Fix CSP header setting. :pr:`2237` Version 2.0.1 diff --git a/src/werkzeug/sansio/response.py b/src/werkzeug/sansio/response.py index 4d55a9ad70..1c458f2263 100644 --- a/src/werkzeug/sansio/response.py +++ b/src/werkzeug/sansio/response.py @@ -12,6 +12,7 @@ from ..utils import get_content_type from werkzeug.datastructures import CallbackDict from werkzeug.datastructures import ContentRange +from werkzeug.datastructures import ContentSecurityPolicy from werkzeug.datastructures import ResponseCacheControl from werkzeug.datastructures import WWWAuthenticate from werkzeug.http import COEP @@ -567,23 +568,72 @@ def on_update(www_auth: WWWAuthenticate) -> None: # CSP - content_security_policy = header_property( - "Content-Security-Policy", - None, - parse_csp_header, # type: ignore - dump_csp_header, - doc="""The Content-Security-Policy header adds an additional layer of - security to help detect and mitigate certain types of attacks.""", - ) - content_security_policy_report_only = header_property( - "Content-Security-Policy-Report-Only", - None, - parse_csp_header, # type: ignore - dump_csp_header, - doc="""The Content-Security-Policy-Report-Only header adds a csp policy + @property + def content_security_policy(self) -> ContentSecurityPolicy: + """The ``Content-Security-Policy`` header as a + :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available + even if the header is not set. + + The Content-Security-Policy header adds an additional layer of + security to help detect and mitigate certain types of attacks. + """ + + def on_update(csp: ContentSecurityPolicy) -> None: + if not csp: + del self.headers["content-security-policy"] + else: + self.headers["Content-Security-Policy"] = csp.to_header() + + rv = parse_csp_header(self.headers.get("content-security-policy"), on_update) + if rv is None: + rv = ContentSecurityPolicy(None, on_update=on_update) + return rv + + @content_security_policy.setter + def content_security_policy( + self, value: t.Optional[t.Union[ContentSecurityPolicy, str]] + ) -> None: + if not value: + del self.headers["content-security-policy"] + elif isinstance(value, str): + self.headers["Content-Security-Policy"] = value + else: + self.headers["Content-Security-Policy"] = value.to_header() + + @property + def content_security_policy_report_only(self) -> ContentSecurityPolicy: + """The ``Content-Security-policy-report-only`` header as a + :class:`~werkzeug.datastructures.ContentSecurityPolicy` object. Available + even if the header is not set. + + The Content-Security-Policy-Report-Only header adds a csp policy that is not enforced but is reported thereby helping detect - certain types of attacks.""", - ) + certain types of attacks. + """ + + def on_update(csp: ContentSecurityPolicy) -> None: + if not csp: + del self.headers["content-security-policy-report-only"] + else: + self.headers["Content-Security-policy-report-only"] = csp.to_header() + + rv = parse_csp_header( + self.headers.get("content-security-policy-report-only"), on_update + ) + if rv is None: + rv = ContentSecurityPolicy(None, on_update=on_update) + return rv + + @content_security_policy_report_only.setter + def content_security_policy_report_only( + self, value: t.Optional[t.Union[ContentSecurityPolicy, str]] + ) -> None: + if not value: + del self.headers["content-security-policy-report-only"] + elif isinstance(value, str): + self.headers["Content-Security-policy-report-only"] = value + else: + self.headers["Content-Security-policy-report-only"] = value.to_header() # CORS diff --git a/tests/test_wrappers.py b/tests/test_wrappers.py index 8d7a5ae874..9130330709 100644 --- a/tests/test_wrappers.py +++ b/tests/test_wrappers.py @@ -1343,6 +1343,20 @@ def test_ranges(): assert resp.content_range.length == 1000 +def test_csp(): + resp = wrappers.Response() + resp.content_security_policy.default_src = "'self'" + assert resp.headers["Content-Security-Policy"] == "default-src 'self'" + resp.content_security_policy.script_src = "'self' palletsprojects.com" + assert ( + resp.headers["Content-Security-Policy"] + == "default-src 'self'; script-src 'self' palletsprojects.com" + ) + + resp.content_security_policy = None + assert "Content-Security-Policy" not in resp.headers + + def test_auto_content_length(): resp = wrappers.Response("Hello World!") assert resp.content_length == 12