From 676d9d4ed2b1be0f985a03d2d557cae4caa45a4f Mon Sep 17 00:00:00 2001 From: Florent Viard Date: Sun, 16 Jan 2022 02:21:38 +0100 Subject: [PATCH] Add detached payload support for JWS encoding and decoding Specifications allow to have JWS with unencoded detached payloads. This changeset adds detached payload support for encoding and decoding functions. For encoding, detached payload can be enabled by setting the "is_payload_detached" arg or having the "b64=False" inside the headers. For decoding, the detached payload content (bytes) has to be provided with the "detached_payload" arg and "b64=False" has to be found inside the decoded headers. Functionnally, when this feature is used, the signature will be computed over the raw data bytes of the payload, without being base64 encoded and obviously, the payload will not be provided inside the generated JWS. So, the generated JWS will look like: base64url(header)..base64url(signature) Relevant specifications: RFC 7515: "JSON Web Signature (JWS)". (Annexe F) RFC 7797: "JSON Web Signature (JWS) Unencoded Payload Option". --- jwt/api_jws.py | 45 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/jwt/api_jws.py b/jwt/api_jws.py index 9e8e17875..fc8be3689 100644 --- a/jwt/api_jws.py +++ b/jwt/api_jws.py @@ -80,34 +80,54 @@ def encode( algorithm: Optional[str] = "HS256", headers: Optional[Dict] = None, json_encoder: Optional[Type[json.JSONEncoder]] = None, + is_payload_detached: bool = False, ) -> str: segments = [] if algorithm is None: algorithm = "none" - # Prefer headers["alg"] if present to algorithm parameter. - if headers and "alg" in headers and headers["alg"]: - algorithm = headers["alg"] + # Prefer headers values if present to function parameters. + if headers: + headers_alg = headers.get("alg") + if headers_alg: + algorithm = headers["alg"] + + headers_b64 = headers.get("b64") + if headers_b64 is False: + is_payload_detached = True # Header - header = {"typ": self.header_typ, "alg": algorithm} + header = {"typ": self.header_typ, "alg": algorithm} # type: Dict[str, Any] if headers: self._validate_headers(headers) header.update(headers) - if not header["typ"]: - del header["typ"] + + if not header["typ"]: + del header["typ"] + + if is_payload_detached: + header["b64"] = False + elif "b64" in header: + # True is the standard value for b64, so no need for it + del header["b64"] json_header = json.dumps( header, separators=(",", ":"), cls=json_encoder ).encode() segments.append(base64url_encode(json_header)) - segments.append(base64url_encode(payload)) + + if is_payload_detached: + msg_payload = payload + else: + msg_payload = base64url_encode(payload) + segments.append(msg_payload) # Segments signing_input = b".".join(segments) + try: alg_obj = self._algorithms[algorithm] key = alg_obj.prepare_key(key) @@ -119,11 +139,13 @@ def encode( "Algorithm '%s' could not be found. Do you have cryptography " "installed?" % algorithm ) from e - else: - raise NotImplementedError("Algorithm not supported") from e + raise NotImplementedError("Algorithm not supported") from e segments.append(base64url_encode(signature)) + # Don't put the payload content inside the encoded token when detached + if is_payload_detached: + segments[1] = b"" encoded_string = b".".join(segments) return encoded_string.decode("utf-8") @@ -134,6 +156,7 @@ def decode_complete( key: str = "", algorithms: Optional[List[str]] = None, options: Optional[Dict] = None, + detached_payload: Optional[bytes] = None, **kwargs, ) -> Dict[str, Any]: if options is None: @@ -148,6 +171,10 @@ def decode_complete( payload, signing_input, header, signature = self._load(jwt) + if detached_payload is not None and header.get("b64", True) is False: + payload = detached_payload + signing_input = b".".join([signing_input.rsplit(b".", 1)[0], payload]) + if verify_signature: self._verify_signature(signing_input, header, signature, key, algorithms)