Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding support for compressed payloads #753

Merged
merged 8 commits into from May 19, 2022
23 changes: 22 additions & 1 deletion jwt/api_jwt.py
@@ -1,5 +1,6 @@
import json
import warnings
import zlib
from calendar import timegm
from collections.abc import Iterable, Mapping
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -108,7 +109,7 @@ def decode_complete(
try:
payload = json.loads(decoded["payload"])
except ValueError as e:
raise DecodeError(f"Invalid payload string: {e}")
payload = self._decompress_payload(decoded["payload"], e)
if not isinstance(payload, dict):
raise DecodeError("Invalid payload string: must be a json object")

Expand All @@ -118,6 +119,26 @@ def decode_complete(
decoded["payload"] = payload
return decoded

@staticmethod
def _decompress_payload(payload, e):
"""
Smart Health cards use a raw-compressed (no header or crc) payload,
so before surfacing a UnicodeDecodeError, find out if it can be
uncompressed successfully
noinspection PyBroadException
"""
if isinstance(e, UnicodeDecodeError):
try:
payload = json.loads(
# wbits=-15 has zlib not worry about headers of crc's
zlib.decompress(payload, wbits=-15).decode("utf-8")
)
except Exception:
payload = None
if payload is not None:
return payload
raise DecodeError(f"Invalid payload string: {e}")

def decode(
self,
jwt: str,
Expand Down
33 changes: 33 additions & 0 deletions tests/test_api_jwt.py
Expand Up @@ -64,6 +64,39 @@ def test_decodes_complete_valid_jwt(self, jwt):
),
}

def test_decodes_complete_valid_jwt_with_compressed_payload(self, jwt):
example_payload = {"hello": "world"}
example_secret = "secret"
# payload made with the pako (https://nodeca.github.io/pako/) library in Javascript:
# Buffer.from(pako.deflateRaw('{"hello": "world"}')).toString('base64')
example_jwt = (
b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
b".q1bKSM3JyVeyUlAqzy/KSVGqBQA="
b".08wHYeuh1rJXmcBcMrz6NxmbxAnCQp2rGTKfRNIkxiw="
)
decoded = jwt.decode_complete(example_jwt, example_secret, algorithms=["HS256"])

assert decoded == {
"header": {"alg": "HS256", "typ": "JWT"},
"payload": example_payload,
"signature": (
b"\xd3\xcc\x07a\xeb\xa1\xd6\xb2W\x99\xc0\\2\xbc\xfa7"
b"\x19\x9b\xc4\t\xc2B\x9d\xab\x192\x9fD\xd2$\xc6,"
),
}

def test_decodes_complete_valid_jwt_with_invalid_compressed_payload(self, jwt):
# payload made with the pako (https://nodeca.github.io/pako/) library in Javascript:
# Buffer.from(pako.deflateRaw('{"this is": "not valid" json')).toString('base64')
example_secret = "secret"
example_jwt = (
b"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9"
b".q1YqycgsVsgsVrJSUMrLL1EoS8zJTFFSyCrOzwMA"
b".SIAxephyoWBJrw+KdBfksJlhdg+mQtm+vjaRXV1qGJ4="
)
with pytest.raises(DecodeError):
jwt.decode_complete(example_jwt, example_secret, algorithms=["HS256"])

def test_load_verify_valid_jwt(self, jwt):
example_payload = {"hello": "world"}
example_secret = "secret"
Expand Down