Skip to content

Commit

Permalink
Add detached payload support for JWS encoding and decoding
Browse files Browse the repository at this point in the history
Specifications allow to have JWS with unencoded detached payloads.
This changeset adds detached payload support for encoding and decoding
functions.

For encoding, detached payload can be enabled by setting the
"is_payload_detached" arg or having the "b64=False" inside the headers.

For decoding, the detached payload content (bytes) has to be provided
with the "detached_payload" arg and "b64=False" has to be found inside
the decoded headers.

Functionnally, when this feature is used, the signature will be computed
over the raw data bytes of the payload, without being base64 encoded and
obviously, the payload will not be provided inside the generated JWS.
So, the generated JWS will look like:
base64url(header)..base64url(signature)

Relevant specifications:
RFC 7515: "JSON Web Signature (JWS)". (Annexe F)
RFC 7797: "JSON Web Signature (JWS) Unencoded Payload Option".
  • Loading branch information
fviard committed Jan 16, 2022
1 parent 77d7916 commit 676d9d4
Showing 1 changed file with 36 additions and 9 deletions.
45 changes: 36 additions & 9 deletions jwt/api_jws.py
Expand Up @@ -80,34 +80,54 @@ def encode(
algorithm: Optional[str] = "HS256",
headers: Optional[Dict] = None,
json_encoder: Optional[Type[json.JSONEncoder]] = None,
is_payload_detached: bool = False,
) -> str:
segments = []

if algorithm is None:
algorithm = "none"

# Prefer headers["alg"] if present to algorithm parameter.
if headers and "alg" in headers and headers["alg"]:
algorithm = headers["alg"]
# Prefer headers values if present to function parameters.
if headers:
headers_alg = headers.get("alg")
if headers_alg:
algorithm = headers["alg"]

headers_b64 = headers.get("b64")
if headers_b64 is False:
is_payload_detached = True

# Header
header = {"typ": self.header_typ, "alg": algorithm}
header = {"typ": self.header_typ, "alg": algorithm} # type: Dict[str, Any]

if headers:
self._validate_headers(headers)
header.update(headers)
if not header["typ"]:
del header["typ"]

if not header["typ"]:
del header["typ"]

if is_payload_detached:
header["b64"] = False
elif "b64" in header:
# True is the standard value for b64, so no need for it
del header["b64"]

json_header = json.dumps(
header, separators=(",", ":"), cls=json_encoder
).encode()

segments.append(base64url_encode(json_header))
segments.append(base64url_encode(payload))

if is_payload_detached:
msg_payload = payload
else:
msg_payload = base64url_encode(payload)
segments.append(msg_payload)

# Segments
signing_input = b".".join(segments)

try:
alg_obj = self._algorithms[algorithm]
key = alg_obj.prepare_key(key)
Expand All @@ -119,11 +139,13 @@ def encode(
"Algorithm '%s' could not be found. Do you have cryptography "
"installed?" % algorithm
) from e
else:
raise NotImplementedError("Algorithm not supported") from e
raise NotImplementedError("Algorithm not supported") from e

segments.append(base64url_encode(signature))

# Don't put the payload content inside the encoded token when detached
if is_payload_detached:
segments[1] = b""
encoded_string = b".".join(segments)

return encoded_string.decode("utf-8")
Expand All @@ -134,6 +156,7 @@ def decode_complete(
key: str = "",
algorithms: Optional[List[str]] = None,
options: Optional[Dict] = None,
detached_payload: Optional[bytes] = None,
**kwargs,
) -> Dict[str, Any]:
if options is None:
Expand All @@ -148,6 +171,10 @@ def decode_complete(

payload, signing_input, header, signature = self._load(jwt)

if detached_payload is not None and header.get("b64", True) is False:
payload = detached_payload
signing_input = b".".join([signing_input.rsplit(b".", 1)[0], payload])

if verify_signature:
self._verify_signature(signing_input, header, signature, key, algorithms)

Expand Down

0 comments on commit 676d9d4

Please sign in to comment.