diff --git a/.travis.yml b/.travis.yml index 736cb9cc..eed4db7e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ # Travis infra requires pinning dist:precise, at least as of 2017-09-01 # detail: https://blog.travis-ci.com/2017-06-21-trusty-updates-2017-Q2-launch -dist: precise +dist: bionic language: python install: - pip install -U setuptools && pip install -U tox codecov tox-travis @@ -8,6 +8,9 @@ script: - tox after_success: - codecov +jobs: + allow_failures: + - python: 3.9-dev matrix: include: # Linting @@ -47,36 +50,57 @@ matrix: - python: 3.6 env: TOXENV=py36-compatibility # CPython 3.7 - # xenial + sudo are currently needed to get 3.7 - # https://github.com/travis-ci/travis-ci/issues/9815 - python: 3.7 env: TOXENV=py37-base - dist: xenial - sudo: true - python: 3.7 env: TOXENV=py37-cryptography-only - dist: xenial - sudo: true - python: 3.7 env: TOXENV=py37-pycryptodome-norsa - dist: xenial - sudo: true - python: 3.7 env: TOXENV=py37-pycrypto-norsa - dist: xenial - sudo: true - python: 3.7 env: TOXENV=py37-compatibility - dist: xenial - sudo: true - # PyPy 3.5 (5.10.1?) - - python: pypy3.5 + # CPython 3.8 + - python: 3.8 + env: TOXENV=py38-base + - python: 3.8 + env: TOXENV=py38-cryptography-only + - python: 3.8 + env: TOXENV=py38-pycryptodome-norsa + - python: 3.8 + env: TOXENV=py38-pycrypto-norsa + - python: 3.8 + env: TOXENV=py38-compatibility + # CPython 3.9 - dev + - python: 3.9-dev + env: TOXENV=py39-base + - python: 3.9-dev + env: TOXENV=py39-cryptography-only + - python: 3.9-dev + env: TOXENV=py39-pycryptodome-norsa + - python: 3.9-dev + env: TOXENV=py39-pycrypto-norsa + - python: 3.9-dev + env: TOXENV=py39-compatibility + # PyPy 2.7 + - python: pypy2 env: TOXENV=pypy-base - - python: pypy3.5 + - python: pypy2 env: TOXENV=pypy-cryptography-only - - python: pypy3.5 + - python: pypy2 env: TOXENV=pypy-pycryptodome-norsa - - python: pypy3.5 + - python: pypy2 env: TOXENV=pypy-pycrypto-norsa - - python: pypy3.5 + - python: pypy2 env: TOXENV=pypy-compatibility + # PyPy 3.x + - python: pypy3 + env: TOXENV=pypy-base + - python: pypy3 + env: TOXENV=pypy3-cryptography-only + - python: pypy3 + env: TOXENV=pypy3-pycryptodome-norsa + - python: pypy3 + env: TOXENV=pypy3-pycrypto-norsa + - python: pypy3 + env: TOXENV=pypy3-compatibility diff --git a/docs/index.rst b/docs/index.rst index 63cd390c..9a1aa676 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -24,6 +24,7 @@ Contents jws/index jwt/index jwk/index + jwe/index APIs @@ -35,6 +36,7 @@ APIs jws/api jwt/api jwk/api + jwe/api Principles diff --git a/docs/jwe/api.rst b/docs/jwe/api.rst new file mode 100644 index 00000000..4086fd8e --- /dev/null +++ b/docs/jwe/api.rst @@ -0,0 +1,6 @@ + +JWE API +^^^^^^^ + +.. automodule:: jose.jwe + :members: \ No newline at end of file diff --git a/docs/jwe/index.rst b/docs/jwe/index.rst new file mode 100644 index 00000000..0a4472fc --- /dev/null +++ b/docs/jwe/index.rst @@ -0,0 +1,71 @@ +JSON Web Encryption +=================== + +JSON Web Encryption (JWE) are used to encrypt a payload and represent it as a +compact URL-safe string. + +Supported Content Encryption Algorithms +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The following algorithms are currently supported. + ++------------------+------------------------------------------------+ +| Encryption Value | Encryption Algorithm, Mode, and Auth Tag | ++==================+================================================+ +| A128CBC_HS256 | AES w/128 bit key in CBC mode w/SHA256 HMAC | ++------------------+------------------------------------------------+ +| A192CBC_HS384 | AES w/128 bit key in CBC mode w/SHA256 HMAC | ++------------------+------------------------------------------------+ +| A256CBC_HS512 | AES w/128 bit key in CBC mode w/SHA256 HMAC | ++------------------+------------------------------------------------+ +| A128GCM | AES w/128 bit key in GCM mode and GCM auth tag | ++------------------+------------------------------------------------+ +| A192GCM | AES w/192 bit key in GCM mode and GCM auth tag | ++------------------+------------------------------------------------+ +| A256GCM | AES w/256 bit key in GCM mode and GCM auth tag | ++------------------+------------------------------------------------+ + +Supported Key Management Algorithms +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The following algorithms are currently supported. + ++-----------------+------------------------------------------------+ +| Algorithm Value | Key Wrap Algorithm | ++=================+================================================+ +| DIR | Direct (no key wrap) | ++-----------------+------------------------------------------------+ +| RSA1_5 | RSAES with PKCS1 v1.5 | ++-----------------+------------------------------------------------+ +| RSA_OAEP | RSAES OAEP using default parameters | ++-----------------+------------------------------------------------+ +| RSA_OAEP_256 | RSAES OAEP using SHA-256 and MGF1 with SHA-256 | ++-----------------+------------------------------------------------+ +| A128KW | AES Key Wrap with default IV using 128-bit key | ++-----------------+------------------------------------------------+ +| A192KW m | AES Key Wrap with default IV using 192-bit key | ++-----------------+------------------------------------------------+ +| A256KW | AES Key Wrap with default IV using 256-bit key | ++-----------------+------------------------------------------------+ + +Examples +^^^^^^^^ + +Encrypting Payloads +------------------- + +.. code:: python + + >>> from jose import jwe + >>> jwe.encrypt('Hello, World!', 'asecret128bitkey', algorithm='dir', encryption='A128GCM') + 'eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4R0NNIn0..McILMB3dYsNJSuhcDzQshA.OfX9H_mcUpHDeRM4IA.CcnTWqaqxNsjT4eCaUABSg' + + +Decrypting Payloads +-------------------------- + +.. code:: python + + >>> from jose import jwe + >>> jwe.decrypt('eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4R0NNIn0..McILMB3dYsNJSuhcDzQshA.OfX9H_mcUpHDeRM4IA.CcnTWqaqxNsjT4eCaUABSg', 'asecret128bitkey') + 'Hello, World!' diff --git a/jose/backends/__init__.py b/jose/backends/__init__.py index d1b9fa1a..ccfab6c1 100644 --- a/jose/backends/__init__.py +++ b/jose/backends/__init__.py @@ -1,9 +1,26 @@ +try: + from jose.backends.cryptography_backend import get_random_bytes # noqa: F401 +except ImportError: + try: + from jose.backends.pycrypto_backend import get_random_bytes # noqa: F401 + except ImportError: + from jose.backends.native import get_random_bytes # noqa: F401 try: from jose.backends.cryptography_backend import CryptographyRSAKey as RSAKey # noqa: F401 except ImportError: try: from jose.backends.pycrypto_backend import RSAKey # noqa: F401 + + # time.clock was deprecated in python 3.3 in favor of time.perf_counter + # and removed in python 3.8. pycrypto was never updated for this. If + # time has no clock attribute, let it use perf_counter instead to work + # in 3.8+ + # noinspection PyUnresolvedReferences + import time + if not hasattr(time, "clock"): + time.clock = time.perf_counter + except ImportError: from jose.backends.rsa_backend import RSAKey # noqa: F401 @@ -11,3 +28,18 @@ from jose.backends.cryptography_backend import CryptographyECKey as ECKey # noqa: F401 except ImportError: from jose.backends.ecdsa_backend import ECDSAECKey as ECKey # noqa: F401 + +try: + from jose.backends.cryptography_backend import CryptographyAESKey as AESKey # noqa: F401 +except ImportError: + try: + from jose.backends.pycrypto_backend import AESKey # noqa: F401 + except ImportError: + AESKey = None + +try: + from jose.backends.cryptography_backend import CryptographyHMACKey as HMACKey # noqa: F401 +except ImportError: + from jose.backends.native import HMACKey # noqa: F401 + +from .base import DIRKey # noqa: F401 diff --git a/jose/backends/base.py b/jose/backends/base.py index 37fc2ea3..c69e7d73 100644 --- a/jose/backends/base.py +++ b/jose/backends/base.py @@ -1,3 +1,8 @@ +import six + +from ..utils import base64url_encode + + class Key(object): """ A simple interface for implementing JWK keys. @@ -19,3 +24,67 @@ def to_pem(self): def to_dict(self): raise NotImplementedError() + + def encrypt(self, plain_text, aad=None): + """ + Encrypt the plain text and generate an auth tag if appropriate + + Args: + plain_text (bytes): Data to encrypt + aad (bytes, optional): Authenticated Additional Data if key's algorithm supports auth mode + + Returns: + (bytes, bytes, bytes): IV, cipher text, and auth tag + """ + raise NotImplementedError() + + def decrypt(self, cipher_text, iv=None, aad=None, tag=None): + """ + Decrypt the cipher text and validate the auth tag if present + Args: + cipher_text (bytes): Cipher text to decrypt + iv (bytes): IV if block mode + aad (bytes): Additional Authenticated Data to verify if auth mode + tag (bytes): Authentication tag if auth mode + + Returns: + bytes: Decrypted value + """ + raise NotImplementedError() + + def wrap_key(self, key_data): + """ + Wrap the the plain text key data + + Args: + key_data (bytes): Key data to wrap + + Returns: + bytes: Wrapped key + """ + raise NotImplementedError() + + def unwrap_key(self, wrapped_key): + """ + Unwrap the the wrapped key data + + Args: + wrapped_key (bytes): Wrapped key data to unwrap + + Returns: + bytes: Unwrapped key + """ + raise NotImplementedError() + + +class DIRKey(Key): + def __init__(self, key_data, algorithm): + self._key = six.ensure_binary(key_data) + self._alg = algorithm + + def to_dict(self): + return { + 'alg': self._alg, + 'kty': 'oct', + 'k': base64url_encode(self._key), + } diff --git a/jose/backends/cryptography_backend.py b/jose/backends/cryptography_backend.py index 8a63157b..2533f2e2 100644 --- a/jose/backends/cryptography_backend.py +++ b/jose/backends/cryptography_backend.py @@ -1,23 +1,53 @@ from __future__ import division import math +import warnings import six -from jose.backends.base import Key -from jose.utils import base64_to_long, long_to_base64 -from jose.constants import ALGORITHMS -from jose.exceptions import JWKError +from .base import Key +from ..utils import base64_to_long, long_to_base64, base64url_decode, base64url_encode +from ..constants import ALGORITHMS +from ..exceptions import JWKError, JWEError -from cryptography.exceptions import InvalidSignature +from cryptography.exceptions import InvalidSignature, InvalidTag from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.bindings.openssl.binding import Binding +from cryptography.hazmat.primitives import hashes, serialization, hmac from cryptography.hazmat.primitives.asymmetric import ec, rsa, padding from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, aead, modes +from cryptography.hazmat.primitives.keywrap import aes_key_wrap, aes_key_unwrap, InvalidUnwrap +from cryptography.hazmat.primitives.padding import PKCS7 from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key from cryptography.utils import int_from_bytes, int_to_bytes from cryptography.x509 import load_pem_x509_certificate +_binding = None + + +def get_random_bytes(num_bytes): + """ + Get random bytes + + Currently, Cryptography returns OS random bytes. If you want OpenSSL + generated random bytes, you'll have to switch the RAND engine after + initializing the OpenSSL backend + Args: + num_bytes (int): Number of random bytes to generate and return + Returns: + bytes: Random bytes + """ + global _binding + + if _binding is None: + _binding = Binding() + + buf = _binding.ffi.new("char[]", num_bytes) + _binding.lib.RAND_bytes(buf, num_bytes) + rand_bytes = _binding.ffi.buffer(buf, num_bytes)[:] + return rand_bytes + class CryptographyECKey(Key): SHA256 = hashes.SHA256 @@ -196,6 +226,10 @@ class CryptographyRSAKey(Key): SHA384 = hashes.SHA384 SHA512 = hashes.SHA512 + RSA1_5 = padding.PKCS1v15() + RSA_OAEP = padding.OAEP(padding.MGF1(hashes.SHA1()), hashes.SHA1(), None) + RSA_OAEP_256 = padding.OAEP(padding.MGF1(hashes.SHA256()), hashes.SHA256(), None) + def __init__(self, key, algorithm, cryptography_backend=default_backend): if algorithm not in ALGORITHMS.RSA: raise JWKError('hash_alg: %s is not a valid hash algorithm' % algorithm) @@ -207,6 +241,12 @@ def __init__(self, key, algorithm, cryptography_backend=default_backend): }.get(algorithm) self._algorithm = algorithm + self.padding = { + ALGORITHMS.RSA1_5: self.RSA1_5, + ALGORITHMS.RSA_OAEP: self.RSA_OAEP, + ALGORITHMS.RSA_OAEP_256: self.RSA_OAEP_256 + }.get(algorithm) + self.cryptography_backend = cryptography_backend # if it conforms to RSAPublicKey interface @@ -294,8 +334,12 @@ def sign(self, msg): return signature def verify(self, msg, sig): + if not self.is_public(): + warnings.warn("Attempting to verify a message with a private key. " + "This is not recommended.") + try: - self.prepared_key.verify( + self.public_key().prepared_key.verify( sig, msg, padding.PKCS1v15(), @@ -364,3 +408,226 @@ def to_dict(self): }) return data + + def wrap_key(self, key_data): + try: + wrapped_key = self.prepared_key.encrypt(key_data, self.padding) + except Exception as e: + raise JWEError(e) + + return wrapped_key + + def unwrap_key(self, wrapped_key): + try: + unwrapped_key = self.prepared_key.decrypt( + wrapped_key, + self.padding + ) + return unwrapped_key + except Exception as e: + raise JWEError(e) + + +class CryptographyAESKey(Key): + KEY_128 = (ALGORITHMS.A128GCM, ALGORITHMS.A128GCMKW, ALGORITHMS.A128KW, + ALGORITHMS.A128CBC) + KEY_192 = (ALGORITHMS.A192GCM, ALGORITHMS.A192GCMKW, ALGORITHMS.A192KW, + ALGORITHMS.A192CBC) + KEY_256 = (ALGORITHMS.A256GCM, ALGORITHMS.A256GCMKW, ALGORITHMS.A256KW, + ALGORITHMS.A128CBC_HS256, ALGORITHMS.A256CBC) + KEY_384 = (ALGORITHMS.A192CBC_HS384,) + KEY_512 = (ALGORITHMS.A256CBC_HS512,) + + AES_KW_ALGS = (ALGORITHMS.A128KW, ALGORITHMS.A192KW, ALGORITHMS.A256KW) + + MODES = { + ALGORITHMS.A128GCM: modes.GCM, + ALGORITHMS.A192GCM: modes.GCM, + ALGORITHMS.A256GCM: modes.GCM, + ALGORITHMS.A128CBC_HS256: modes.CBC, + ALGORITHMS.A192CBC_HS384: modes.CBC, + ALGORITHMS.A256CBC_HS512: modes.CBC, + ALGORITHMS.A128CBC: modes.CBC, + ALGORITHMS.A192CBC: modes.CBC, + ALGORITHMS.A256CBC: modes.CBC, + ALGORITHMS.A128GCMKW: modes.GCM, + ALGORITHMS.A192GCMKW: modes.GCM, + ALGORITHMS.A256GCMKW: modes.GCM, + ALGORITHMS.A128KW: None, + ALGORITHMS.A192KW: None, + ALGORITHMS.A256KW: None + } + + def __init__(self, key, algorithm): + if algorithm not in ALGORITHMS.AES: + raise JWKError('%s is not a valid AES algorithm' % algorithm) + if algorithm not in ALGORITHMS.SUPPORTED.union(ALGORITHMS.AES_PSEUDO): + raise JWKError('%s is not a supported algorithm' % algorithm) + + self._algorithm = algorithm + self._mode = self.MODES.get(self._algorithm) + + if algorithm in self.KEY_128 and len(key) != 16: + raise JWKError("Key must be 128 bit for alg {}".format(algorithm)) + elif algorithm in self.KEY_192 and len(key) != 24: + raise JWKError("Key must be 192 bit for alg {}".format(algorithm)) + elif algorithm in self.KEY_256 and len(key) != 32: + raise JWKError("Key must be 256 bit for alg {}".format(algorithm)) + elif algorithm in self.KEY_384 and len(key) != 48: + raise JWKError("Key must be 384 bit for alg {}".format(algorithm)) + elif algorithm in self.KEY_512 and len(key) != 64: + raise JWKError("Key must be 512 bit for alg {}".format(algorithm)) + + self._key = key + + def to_dict(self): + data = { + 'alg': self._algorithm, + 'kty': 'oct', + 'k': base64url_encode(self._key) + } + return data + + def encrypt(self, plain_text, aad=None): + plain_text = six.ensure_binary(plain_text) + try: + iv = get_random_bytes(algorithms.AES.block_size//8) + mode = self._mode(iv) + if mode.name == "GCM": + cipher = aead.AESGCM(self._key) + cipher_text_and_tag = cipher.encrypt(iv, plain_text, aad) + cipher_text = cipher_text_and_tag[:len(cipher_text_and_tag) - 16] + auth_tag = cipher_text_and_tag[-16:] + else: + cipher = Cipher(algorithms.AES(self._key), mode, + backend=default_backend()) + encryptor = cipher.encryptor() + padder = PKCS7(algorithms.AES.block_size).padder() + padded_data = padder.update(plain_text) + padded_data += padder.finalize() + cipher_text = encryptor.update(padded_data) + encryptor.finalize() + auth_tag = None + return iv, cipher_text, auth_tag + except Exception as e: + raise JWEError(e) + + def decrypt(self, cipher_text, iv=None, aad=None, tag=None): + cipher_text = six.ensure_binary(cipher_text) + try: + iv = six.ensure_binary(iv) + mode = self._mode(iv) + if mode.name == "GCM": + if tag is None: + raise ValueError("tag cannot be None") + cipher = aead.AESGCM(self._key) + cipher_text_and_tag = cipher_text + tag + try: + plain_text = cipher.decrypt(iv, cipher_text_and_tag, aad) + except InvalidTag: + raise JWEError("Invalid JWE Auth Tag") + else: + cipher = Cipher(algorithms.AES(self._key), mode, + backend=default_backend()) + decryptor = cipher.decryptor() + padded_plain_text = decryptor.update(cipher_text) + padded_plain_text += decryptor.finalize() + unpadder = PKCS7(algorithms.AES.block_size).unpadder() + plain_text = unpadder.update(padded_plain_text) + plain_text += unpadder.finalize() + + return plain_text + except Exception as e: + raise JWEError(e) + + def wrap_key(self, key_data): + key_data = six.ensure_binary(key_data) + cipher_text = aes_key_wrap(self._key, key_data, default_backend()) + return cipher_text # IV, cipher text, auth tag + + def unwrap_key(self, wrapped_key): + wrapped_key = six.ensure_binary(wrapped_key) + try: + plain_text = aes_key_unwrap(self._key, wrapped_key, default_backend()) + except InvalidUnwrap as cause: + raise JWEError(cause) + return plain_text + + +class CryptographyHMACKey(Key): + """ + Performs signing and verification operations using HMAC + and the specified hash function. + """ + + ALG_MAP = { + ALGORITHMS.HS256: hashes.SHA256(), + ALGORITHMS.HS384: hashes.SHA384(), + ALGORITHMS.HS512: hashes.SHA512() + } + + def __init__(self, key, algorithm): + if algorithm not in ALGORITHMS.HMAC: + raise JWKError('hash_alg: %s is not a valid hash algorithm' % algorithm) + self._algorithm = algorithm + self._hash_alg = self.ALG_MAP.get(algorithm) + + if isinstance(key, dict): + self.prepared_key = self._process_jwk(key) + return + + if not isinstance(key, six.string_types) and not isinstance(key, bytes): + raise JWKError('Expecting a string- or bytes-formatted key.') + + if isinstance(key, six.text_type): + key = key.encode('utf-8') + + invalid_strings = [ + b'-----BEGIN PUBLIC KEY-----', + b'-----BEGIN RSA PUBLIC KEY-----', + b'-----BEGIN CERTIFICATE-----', + b'ssh-rsa' + ] + + if any(string_value in key for string_value in invalid_strings): + raise JWKError( + 'The specified key is an asymmetric key or x509 certificate and' + ' should not be used as an HMAC secret.') + + self.prepared_key = key + + def _process_jwk(self, jwk_dict): + if not jwk_dict.get('kty') == 'oct': + raise JWKError("Incorrect key type. Expected: 'oct', Received: %s" % jwk_dict.get('kty')) + + k = jwk_dict.get('k') + k = k.encode('utf-8') + k = bytes(k) + k = base64url_decode(k) + + return k + + def to_dict(self): + return { + 'alg': self._algorithm, + 'kty': 'oct', + 'k': base64url_encode(self.prepared_key).decode('ASCII'), + } + + def sign(self, msg): + msg = six.ensure_binary(msg) + h = hmac.HMAC(self.prepared_key, self._hash_alg, backend=default_backend()) + h.update(msg) + signature = h.finalize() + return signature + + def verify(self, msg, sig): + msg = six.ensure_binary(msg) + sig = six.ensure_binary(sig) + h = hmac.HMAC(self.prepared_key, self._hash_alg, backend=default_backend()) + h.update(msg) + try: + h.verify(sig) + verified = True + except InvalidSignature: + verified = False + return verified diff --git a/jose/backends/native.py b/jose/backends/native.py new file mode 100644 index 00000000..32862150 --- /dev/null +++ b/jose/backends/native.py @@ -0,0 +1,80 @@ +import hashlib +import hmac +import os + +import six + +from jose.backends.base import Key +from jose.constants import ALGORITHMS +from jose.exceptions import JWKError +from jose.utils import base64url_decode, base64url_encode + + +def get_random_bytes(num_bytes): + return bytes(os.urandom(num_bytes)) + + +class HMACKey(Key): + """ + Performs signing and verification operations using HMAC + and the specified hash function. + """ + HASHES = { + ALGORITHMS.HS256: hashlib.sha256, + ALGORITHMS.HS384: hashlib.sha384, + ALGORITHMS.HS512: hashlib.sha512 + } + + def __init__(self, key, algorithm): + if algorithm not in ALGORITHMS.HMAC: + raise JWKError('hash_alg: %s is not a valid hash algorithm' % algorithm) + self._algorithm = algorithm + self._hash_alg = self.HASHES.get(algorithm) + + if isinstance(key, dict): + self.prepared_key = self._process_jwk(key) + return + + if not isinstance(key, six.string_types) and not isinstance(key, bytes): + raise JWKError('Expecting a string- or bytes-formatted key.') + + if isinstance(key, six.text_type): + key = key.encode('utf-8') + + invalid_strings = [ + b'-----BEGIN PUBLIC KEY-----', + b'-----BEGIN RSA PUBLIC KEY-----', + b'-----BEGIN CERTIFICATE-----', + b'ssh-rsa' + ] + + if any(string_value in key for string_value in invalid_strings): + raise JWKError( + 'The specified key is an asymmetric key or x509 certificate and' + ' should not be used as an HMAC secret.') + + self.prepared_key = key + + def _process_jwk(self, jwk_dict): + if not jwk_dict.get('kty') == 'oct': + raise JWKError("Incorrect key type. Expected: 'oct', Received: %s" % jwk_dict.get('kty')) + + k = jwk_dict.get('k') + k = k.encode('utf-8') + k = bytes(k) + k = base64url_decode(k) + + return k + + def sign(self, msg): + return hmac.new(self.prepared_key, msg, self._hash_alg).digest() + + def verify(self, msg, sig): + return hmac.compare_digest(sig, self.sign(msg)) + + def to_dict(self): + return { + 'alg': self._algorithm, + 'kty': 'oct', + 'k': base64url_encode(self.prepared_key).decode('ASCII'), + } diff --git a/jose/backends/pycrypto_backend.py b/jose/backends/pycrypto_backend.py index b8f7b678..94aa7275 100644 --- a/jose/backends/pycrypto_backend.py +++ b/jose/backends/pycrypto_backend.py @@ -1,4 +1,5 @@ from base64 import b64encode +from binascii import unhexlify import six import warnings @@ -6,17 +7,20 @@ import Crypto.Hash.SHA256 import Crypto.Hash.SHA384 import Crypto.Hash.SHA512 +from Crypto import Random from Crypto.PublicKey import RSA -from Crypto.Signature import PKCS1_v1_5 +from Crypto.Signature import PKCS1_v1_5 as PKCS1_v1_5_Signature +from Crypto.Cipher import PKCS1_v1_5 as PKCS1_v1_5_Cipher +from Crypto.Cipher import PKCS1_OAEP, AES from Crypto.Util.asn1 import DerSequence -from jose.backends.base import Key -from jose.backends._asn1 import rsa_public_key_pkcs8_to_pkcs1 -from jose.utils import base64_to_long, long_to_base64 -from jose.constants import ALGORITHMS -from jose.exceptions import JWKError -from jose.utils import base64url_decode +from .base import Key +from ._asn1 import rsa_public_key_pkcs8_to_pkcs1 +from ..utils import base64_to_long, long_to_base64 +from ..constants import ALGORITHMS +from ..exceptions import JWKError, JWEError, JWEAlgorithmUnsupportedError +from ..utils import base64url_decode # We default to using PyCryptodome, however, if PyCrypto is installed, it is @@ -27,6 +31,15 @@ else: _RSAKey = RSA._RSAobj +if not hasattr(AES, "MODE_GCM"): + # PyCrypto does not support GCM mode + for gcm_alg in ALGORITHMS.GCM: + ALGORITHMS.SUPPORTED.remove(gcm_alg) + + +def get_random_bytes(num_bytes): + return bytes(Random.new().read(num_bytes)) + def _der_to_pem(der_key, marker): """ @@ -54,6 +67,7 @@ class RSAKey(Key): SHA256 = Crypto.Hash.SHA256 SHA384 = Crypto.Hash.SHA384 SHA512 = Crypto.Hash.SHA512 + SHA1 = Crypto.Hash.SHA if hasattr(Crypto.Hash, "SHA") else Crypto.Hash.SHA1 def __init__(self, key, algorithm): @@ -63,7 +77,10 @@ def __init__(self, key, algorithm): self.hash_alg = { ALGORITHMS.RS256: self.SHA256, ALGORITHMS.RS384: self.SHA384, - ALGORITHMS.RS512: self.SHA512 + ALGORITHMS.RS512: self.SHA512, + ALGORITHMS.RSA1_5: self.SHA1, + ALGORITHMS.RSA_OAEP: self.SHA1, + ALGORITHMS.RSA_OAEP_256: self.SHA256, }.get(algorithm) self._algorithm = algorithm @@ -143,7 +160,7 @@ def _process_cert(self, key): def sign(self, msg): try: - return PKCS1_v1_5.new(self.prepared_key).sign(self.hash_alg.new(msg)) + return PKCS1_v1_5_Signature.new(self.prepared_key).sign(self.hash_alg.new(msg)) except Exception as e: raise JWKError(e) @@ -152,7 +169,7 @@ def verify(self, msg, sig): warnings.warn("Attempting to verify a message with a private key. " "This is not recommended.") try: - return PKCS1_v1_5.new(self.prepared_key).verify(self.hash_alg.new(msg), sig) + return PKCS1_v1_5_Signature.new(self.prepared_key).verify(self.hash_alg.new(msg), sig) except Exception: return False @@ -214,3 +231,300 @@ def to_dict(self): }) return data + + def wrap_key(self, key_data): + try: + if self._algorithm == ALGORITHMS.RSA1_5: + cipher = PKCS1_v1_5_Cipher.new(self.prepared_key) + else: + cipher = PKCS1_OAEP.new(self.prepared_key, self.hash_alg) + wrapped_key = cipher.encrypt(key_data) + return wrapped_key + except Exception as e: + raise JWKError(e) + + def unwrap_key(self, wrapped_key): + try: + if self._algorithm == ALGORITHMS.RSA1_5: + sentinel = Random.new().read(32) + cipher = PKCS1_v1_5_Cipher.new(self.prepared_key) + plain_text = cipher.decrypt(wrapped_key, sentinel) + else: + cipher = PKCS1_OAEP.new(self.prepared_key, self.hash_alg) + plain_text = cipher.decrypt(wrapped_key) + return plain_text + except Exception as e: + raise JWEError(e) + + +class AESKey(Key): + ALG_128 = (ALGORITHMS.A128GCM, ALGORITHMS.A128CBC_HS256, ALGORITHMS.A128GCMKW, ALGORITHMS.A128KW) + ALG_192 = (ALGORITHMS.A192GCM, ALGORITHMS.A192CBC_HS384, ALGORITHMS.A192GCMKW, ALGORITHMS.A192KW) + ALG_256 = (ALGORITHMS.A256GCM, ALGORITHMS.A256CBC_HS512, ALGORITHMS.A256GCMKW, ALGORITHMS.A256KW) + + AES_KW_ALGS = (ALGORITHMS.A128KW, ALGORITHMS.A192KW, ALGORITHMS.A256KW) + + MODES = { + ALGORITHMS.A128CBC_HS256: AES.MODE_CBC, + ALGORITHMS.A192CBC_HS384: AES.MODE_CBC, + ALGORITHMS.A256CBC_HS512: AES.MODE_CBC, + ALGORITHMS.A128CBC: AES.MODE_CBC, + ALGORITHMS.A192CBC: AES.MODE_CBC, + ALGORITHMS.A256CBC: AES.MODE_CBC, + ALGORITHMS.A128KW: AES.MODE_ECB, + ALGORITHMS.A192KW: AES.MODE_ECB, + ALGORITHMS.A256KW: AES.MODE_ECB + } + if hasattr(AES, "MODE_GCM"): + # pycrypto does not support GCM. pycryptdome does + MODES.update({ + ALGORITHMS.A128GCMKW: AES.MODE_GCM, + ALGORITHMS.A192GCMKW: AES.MODE_GCM, + ALGORITHMS.A256GCMKW: AES.MODE_GCM, + ALGORITHMS.A128GCM: AES.MODE_GCM, + ALGORITHMS.A192GCM: AES.MODE_GCM, + ALGORITHMS.A256GCM: AES.MODE_GCM, + }) + + def __init__(self, key, algorithm): + if algorithm not in ALGORITHMS.AES: + raise JWKError('%s is not a valid AES algorithm' % algorithm) + if algorithm not in ALGORITHMS.SUPPORTED.union(ALGORITHMS.AES_PSEUDO): + raise JWKError('%s is not a supported algorithm' % algorithm) + + self._algorithm = algorithm + self._mode = self.MODES.get(self._algorithm) + if self._mode is None: + raise JWEAlgorithmUnsupportedError("AES Mode is not supported by cryptographic library") + + if algorithm in self.ALG_128 and len(key) != 16: + raise JWKError("Key must be 128 bits for alg {}".format(algorithm)) + elif algorithm in self.ALG_192 and len(key) != 24: + raise JWKError("Key must be 192 bits for alg {}".format(algorithm)) + elif algorithm in self.ALG_256 and len(key) != 32: + raise JWKError("Key must be 256 bits for alg {}".format(algorithm)) + + self._key = six.ensure_binary(key) + + def to_dict(self): + data = { + 'alg': self._algorithm, + 'kty': 'oct', + 'k': self._key + } + return data + + def encrypt(self, plain_text, aad=None): + plain_text = six.ensure_binary(plain_text) + try: + iv = get_random_bytes(AES.block_size) + cipher = AES.new(self._key, self._mode, iv) + if self._mode == AES.MODE_CBC: + padded_plain_text = self._pad(AES.block_size, plain_text) + cipher_text = cipher.encrypt(padded_plain_text) + auth_tag = None + else: + cipher.update(aad) + cipher_text, auth_tag = cipher.encrypt_and_digest(plain_text) + return iv, cipher_text, auth_tag + except Exception as e: + raise JWEError(e) + + def decrypt(self, cipher_text, iv=None, aad=None, tag=None): + cipher_text = six.ensure_binary(cipher_text) + try: + cipher = AES.new(self._key, self._mode, iv) + if self._mode == AES.MODE_CBC: + padded_plain_text = cipher.decrypt(cipher_text) + plain_text = self._unpad(padded_plain_text) + else: + cipher.update(aad) + try: + plain_text = cipher.decrypt_and_verify(cipher_text, tag) + except ValueError: + raise JWEError("Invalid JWE Auth Tag") + + return plain_text + except Exception as e: + raise JWEError(e) + + DEFAULT_IV = unhexlify("A6A6A6A6A6A6A6A6") + + def wrap_key(self, key_data): + key_data = six.ensure_binary(key_data) + + # AES(K, W) Encrypt W using the AES codebook with key K + def aes(k_, w_): + return AES.new(k_, AES.MODE_ECB).encrypt(w_) + + # MSB(j, W) Return the most significant j bits of W + msb = self._most_significant_bits + + # LSB(j, W) Return the least significant j bits of W + lsb = self._least_significant_bits + + # B1 ^ B2 The bitwise exclusive or (XOR) of B1 and B2 + # B1 | B2 Concatenate B1 and B2 + + # K The key-encryption key K + k = self._key + + # n The number of 64-bit key data blocks + n = len(key_data) // 8 + + # P[i] The ith plaintext key data block + p = [None] + [key_data[i * 8:i * 8 + 8] for i in range(n)] # Split into 8 byte blocks and prepend the 0th item + + # C[i] The ith ciphertext data block + c = [None] + [None for _ in range(n)] # Initialize c with n items and prepend the 0th item + + # A The 64-bit integrity check register + a = None + + # R[i] An array of 64-bit registers where + # i = 0, 1, 2, ..., n + r = [None] + [None for _ in range(n)] # Initialize r with n items and prepend the 0th item + + # A[t], R[i][t] The contents of registers A and R[i] after encryption + # step t. + + # IV The 64-bit initial value used during the wrapping + # process. + iv = self.DEFAULT_IV + + # 1) Initialize variables. + + # Set A = IV, an initial value + a = iv + # For i = 1 to n + for i in range(1, n + 1): + # R[i] = P[i] + r[i] = p[i] + + # 2) Calculate intermediate values. + # For j = 0 to 5 + for j in range(6): + # For i=1 to n + for i in range(1, n + 1): + # B = AES(K, A | R[i]) + b = aes(k, a + r[i]) + # A = MSB(64, B) ^ t where t = (n*j)+i + t = (n * j) + i + a = msb(64, b) + a = a[:7] + six.int2byte(six.byte2int([a[7]]) ^ t) + # R[i] = LSB(64, B) + r[i] = lsb(64, b) + + # 3) Output the results. + # Set C[0] = A + c[0] = a + # For i = 1 to n + for i in range(1, n + 1): + # C[i] = R[i] + c[i] = r[i] + + cipher_text = b"".join(c) # Join the chunks to return + return cipher_text # IV, cipher text, auth tag + + def unwrap_key(self, wrapped_key): + wrapped_key = six.ensure_binary(wrapped_key) + + # AES-1(K, W) Decrypt W using the AES codebook with key K + def aes_1(k_, w_): + return AES.new(k_, AES.MODE_ECB).decrypt(w_) + + # MSB(j, W) Return the most significant j bits of W + msb = self._most_significant_bits + + # LSB(j, W) Return the least significant j bits of W + lsb = self._least_significant_bits + + # B1 ^ B2 The bitwise exclusive or (XOR) of B1 and B2 + # B1 | B2 Concatenate B1 and B2 + + # K The key-encryption key K + k = self._key + + # n The number of 64-bit key data blocks + n = len(wrapped_key) // 8 - 1 + + # P[i] The ith plaintext key data block + p = [None] + [None] * n # Initialize p with n items and prepend the 0th item + + # C[i] The ith ciphertext data block + c = [wrapped_key[i*8:i*8+8] for i in range(n + 1)] # Split ciphertext into 8 byte chunks + + # A The 64-bit integrity check register + a = None + + # R[i] An array of 64-bit registers where + # i = 0, 1, 2, ..., n + r = [None] + [None] * n # Initialize r with n items and prepend the 0th item + + # A[t], R[i][t] The contents of registers A and R[i] after encryption + # step t. + + # 1) Initialize variables. + # Set A = C[0] + a = c[0] + # For i = 1 to n + for i in range(1, n + 1): + # R[i] = C[i] + r[i] = c[i] + + # 2) Compute intermediate values. + # For j = 5 to 0 + for j in range(5, -1, -1): + # For i = n to 1 + for i in range(n, 0, -1): + # B = AES-1(K, (A ^ t) | R[i]) where t = n*j+i + t = n * j + i + a = a[:7] + six.int2byte(six.byte2int([a[7]]) ^ t) + b = aes_1(k, a + r[i]) + # A = MSB(64, B) + a = msb(64, b) + # R[i] = LSB(64, B) + r[i] = lsb(64, b) + + # 3) Output results. + # If A is an appropriate initial value (see 2.2.3), + if a == self.DEFAULT_IV: + # Then + # For i = 1 to n + for i in range(1, n + 1): + # P[i] = R[i] + p[i] = r[i] + # Else + else: + # Return an error + raise JWEError("Invalid AES Keywrap") + + return b"".join(p[1:]) # Join the chunks and return + + @staticmethod + def _most_significant_bits(number_of_bits, _bytes): + number_of_bytes = number_of_bits // 8 + msb = _bytes[:number_of_bytes] + return msb + + @staticmethod + def _least_significant_bits(number_of_bits, _bytes): + number_of_bytes = number_of_bits // 8 + lsb = _bytes[-number_of_bytes:] + return lsb + + @staticmethod + def _pad(block_size, unpadded): + padding_bytes = block_size - len(unpadded) % block_size + padding = bytes(bytearray([padding_bytes]) * padding_bytes) + return unpadded + padding + + @staticmethod + def _unpad(padded): + padded = six.ensure_binary(padded) + padding_byte = padded[-1] + if isinstance(padded, six.string_types): + padding_byte = ord(padding_byte) + if padded[-padding_byte:] != bytearray([padding_byte]) * padding_byte: + raise ValueError("Invalid padding!") + return padded[:-padding_byte] diff --git a/jose/backends/rsa_backend.py b/jose/backends/rsa_backend.py index 79862a3b..160d3c0e 100644 --- a/jose/backends/rsa_backend.py +++ b/jose/backends/rsa_backend.py @@ -7,6 +7,7 @@ import rsa as pyrsa import rsa.pem as pyrsa_pem +from rsa import DecryptionError from jose.backends.base import Key from jose.backends._asn1 import ( @@ -15,9 +16,10 @@ rsa_public_key_pkcs1_to_pkcs8, ) from jose.constants import ALGORITHMS -from jose.exceptions import JWKError +from jose.exceptions import JWKError, JWEError from jose.utils import base64_to_long, long_to_base64 +ALGORITHMS.SUPPORTED.remove(ALGORITHMS.RSA_OAEP) # RSA OAEP not supported LEGACY_INVALID_PKCS8_RSA_HEADER = binascii.unhexlify( "30" # sequence @@ -127,6 +129,9 @@ def __init__(self, key, algorithm): if algorithm not in ALGORITHMS.RSA: raise JWKError('hash_alg: %s is not a valid hash algorithm' % algorithm) + if algorithm in ALGORITHMS.RSA_KW and algorithm != ALGORITHMS.RSA1_5: + raise JWKError('alg: %s is not supported by the RSA backend' % algorithm) + self.hash_alg = { ALGORITHMS.RS256: self.SHA256, ALGORITHMS.RS384: self.SHA384, @@ -266,3 +271,17 @@ def to_dict(self): }) return data + + def wrap_key(self, key_data): + if not self.is_public(): + warnings.warn("Attempting to encrypt a message with a private key." + " This is not recommended.") + wrapped_key = pyrsa.encrypt(key_data, self._prepared_key) + return wrapped_key + + def unwrap_key(self, wrapped_key): + try: + unwrapped_key = pyrsa.decrypt(wrapped_key, self._prepared_key) + except DecryptionError as e: + raise JWEError(e) + return unwrapped_key diff --git a/jose/constants.py b/jose/constants.py index eb146549..bd2fc99b 100644 --- a/jose/constants.py +++ b/jose/constants.py @@ -2,6 +2,7 @@ class Algorithms(object): + # DS Algorithms NONE = 'none' HS256 = 'HS256' HS384 = 'HS384' @@ -13,13 +14,65 @@ class Algorithms(object): ES384 = 'ES384' ES512 = 'ES512' + # Content Encryption Algorithms + A128CBC_HS256 = 'A128CBC-HS256' + A192CBC_HS384 = 'A192CBC-HS384' + A256CBC_HS512 = 'A256CBC-HS512' + A128GCM = "A128GCM" + A192GCM = "A192GCM" + A256GCM = "A256GCM" + + # Pseudo algorithm for encryption + A128CBC = "A128CBC" + A192CBC = "A192CBC" + A256CBC = "A256CBC" + + # CEK Encryption Algorithms + DIR = 'dir' + RSA1_5 = 'RSA1_5' + RSA_OAEP = 'RSA-OAEP' + RSA_OAEP_256 = 'RSA-OAEP-256' + A128KW = 'A128KW' + A192KW = 'A192KW' + A256KW = 'A256KW' + ECDH_ES = 'ECDH-ES' + ECDH_ES_A128KW = 'ECDH-ES+A128KW' + ECDH_ES_A192KW = 'ECDH-ES+A192KW' + ECDH_ES_A256KW = 'ECDH-ES+A256KW' + A128GCMKW = 'A128GCMKW' + A192GCMKW = 'A192GCMKW' + A256GCMKW = 'A256GCMKW' + PBES2_HS256_A128KW = 'PBES2-HS256+A128KW' + PBES2_HS384_A192KW = 'PBES2-HS384+A192KW' + PBES2_HS512_A256KW = 'PBES2-HS512+A256KW' + + # Compression Algorithms + DEF = 'DEF' + HMAC = {HS256, HS384, HS512} - RSA = {RS256, RS384, RS512} - EC = {ES256, ES384, ES512} + RSA_DS = {RS256, RS384, RS512} + RSA_KW = {RSA1_5, RSA_OAEP, RSA_OAEP_256} + RSA = RSA_DS.union(RSA_KW) + EC_DS = {ES256, ES384, ES512} + EC_KW = {ECDH_ES, ECDH_ES_A128KW, ECDH_ES_A192KW, ECDH_ES_A256KW} + EC = EC_DS.union(EC_KW) + AES_PSEUDO = {A128CBC, A192CBC, A256CBC, A128GCM, A192GCM, A256GCM} + AES_JWE_ENC = {A128CBC_HS256, A192CBC_HS384, A256CBC_HS512, + A128GCM, A192GCM, A256GCM} + AES_ENC = AES_JWE_ENC.union(AES_PSEUDO) + AES_KW = {A128KW, A192KW, A256KW} + AEC_GCM_KW = {A128GCMKW, A192GCMKW, A256GCMKW} + AES = AES_ENC.union(AES_KW) + PBES2_KW = {PBES2_HS256_A128KW, PBES2_HS384_A192KW, PBES2_HS512_A256KW} + + HMAC_AUTH_TAG = {A128CBC_HS256, A192CBC_HS384, A256CBC_HS512} + GCM = {A128GCM, A192GCM, A256GCM} - SUPPORTED = HMAC.union(RSA).union(EC) + SUPPORTED = HMAC.union(RSA_DS).union(EC_DS).union([DIR]) \ + .union(AES_JWE_ENC).union(RSA_KW).union(AES_KW) - ALL = SUPPORTED.union([NONE]) + ALL = SUPPORTED.union([NONE]).union(AEC_GCM_KW).union(EC_KW) \ + .union(PBES2_KW) HASHES = { HS256: hashlib.sha256, @@ -37,3 +90,12 @@ class Algorithms(object): ALGORITHMS = Algorithms() + + +class Zips(object): + DEF = "DEF" + NONE = None + SUPPORTED = {DEF, NONE} + + +ZIPS = Zips() diff --git a/jose/exceptions.py b/jose/exceptions.py index 1bbb9e96..5e6c874f 100644 --- a/jose/exceptions.py +++ b/jose/exceptions.py @@ -30,3 +30,28 @@ class ExpiredSignatureError(JWTError): class JWKError(JOSEError): pass + + +class JWEError(JOSEError): + """ Base error for all JWE errors """ + pass + + +class JWEParseError(JWEError): + """ Could not parse the JWE string provided """ + pass + + +class JWEInvalidAuth(JWEError): + """ + The authentication tag did not match the protected sections of the + JWE string provided + """ + pass + + +class JWEAlgorithmUnsupportedError(JWEError): + """ + The JWE algorithm is not supported by the backend + """ + pass diff --git a/jose/jwe.py b/jose/jwe.py new file mode 100644 index 00000000..cdc65ec0 --- /dev/null +++ b/jose/jwe.py @@ -0,0 +1,607 @@ +import binascii +import json +import zlib +from struct import pack + +import six + +try: + from collections.abc import Mapping # Python 3 +except ImportError: + from collections import Mapping # Python 2, will be deprecated in Python 3.8 + +from .backends import get_random_bytes +from .constants import ALGORITHMS, ZIPS +from .exceptions import JWEParseError, JWEError +from .utils import base64url_decode, base64url_encode +from . import jwk + + +def encrypt(plaintext, key, encryption=ALGORITHMS.A256GCM, + algorithm=ALGORITHMS.DIR, zip=None, cty=None, kid=None): + """Encrypts plaintext and returns a JWE cmpact serialization string. + + Args: + plaintext (bytes): A bytes object to encrypt + key (str or dict): The key(s) to use for encrypting the content. Can be + individual JWK or JWK set. + encryption (str, optional): The content encryption algorithm used to + perform authenticated encryption on the plaintext to produce the + ciphertext and the Authentication Tag. Defaults to A256GCM. + algorithm (str, optional): The cryptographic algorithm used + to encrypt or determine the value of the CEK. Defaults to dir. + zip (str, optional): The compression algorithm) applied to the + plaintext before encryption. Defaults to None. + cty (str, optional): The media type for the secured content. + See http://www.iana.org/assignments/media-types/media-types.xhtml + kid (str, optional): Key ID for the provided key + + Returns: + bytes: The string representation of the header, encrypted key, + initialization vector, ciphertext, and authentication tag. + + Raises: + JWEError: If there is an error signing the token. + + Examples: + >>> from jose import jwe + >>> jwe.encrypt('Hello, World!', 'asecret128bitkey', algorithm='dir', encryption='A128GCM') + 'eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4R0NNIn0..McILMB3dYsNJSuhcDzQshA.OfX9H_mcUpHDeRM4IA.CcnTWqaqxNsjT4eCaUABSg' + + """ + plaintext = six.ensure_binary(plaintext) # Make sure it's bytes + if algorithm not in ALGORITHMS.SUPPORTED: + raise JWEError('Algorithm %s not supported.' % algorithm) + if encryption not in ALGORITHMS.SUPPORTED: + raise JWEError('Algorithm %s not supported.' % encryption) + key = jwk.construct(key, algorithm) + encoded_header = _encoded_header(algorithm, encryption, zip, cty, kid) + + plaintext = _compress(zip, plaintext) + enc_cek, iv, cipher_text, auth_tag = _encrypt_and_auth( + key, algorithm, encryption, zip, plaintext, encoded_header) + + jwe_string = _jwe_compact_serialize( + encoded_header, enc_cek, iv, cipher_text, auth_tag) + return jwe_string + + +def decrypt(jwe_str, key): + """Decrypts a JWE compact serialized string and returns the plaintext. + + Args: + jwe_str (str): A JWE to be decrypt. + key (str or dict): A key to attempt to decrypt the payload with. Can be + individual JWK or JWK set. + + Returns: + bytes: The plaintext bytes, assuming the authentication tag is valid. + + Raises: + JWEError: If there is an exception verifying the token. + + Examples: + >>> from jose import jwe + >>> jwe.decrypt(jwe_string, 'asecret128bitkey') + 'Hello, World!' + """ + header, encoded_header, encrypted_key, iv, cipher_text, auth_tag = _jwe_compact_deserialize(jwe_str) + + # Verify that the implementation understands and can process all + # fields that it is required to support, whether required by this + # specification, by the algorithms being used, or by the "crit" + # Header Parameter value, and that the values of those parameters + # are also understood and supported. + + try: + # Determine the Key Management Mode employed by the algorithm + # specified by the "alg" (algorithm) Header Parameter. + alg = header["alg"] + enc = header["enc"] + if alg not in ALGORITHMS.SUPPORTED: + raise JWEError('Algorithm %s not supported.' % alg) + if enc not in ALGORITHMS.SUPPORTED: + raise JWEError('Algorithm %s not supported.' % enc) + + except KeyError: + raise JWEParseError("alg and enc headers are required!") + + # Verify that the JWE uses a key known to the recipient. + key = jwk.construct(key, alg) + + # When Direct Key Agreement or Key Agreement with Key Wrapping are + # employed, use the key agreement algorithm to compute the value + # of the agreed upon key. When Direct Key Agreement is employed, + # let the CEK be the agreed upon key. When Key Agreement with Key + # Wrapping is employed, the agreed upon key will be used to + # decrypt the JWE Encrypted Key. + # + # When Key Wrapping, Key Encryption, or Key Agreement with Key + # Wrapping are employed, decrypt the JWE Encrypted Key to produce + # the CEK. The CEK MUST have a length equal to that required for + # the content encryption algorithm. Note that when there are + # multiple recipients, each recipient will only be able to decrypt + # JWE Encrypted Key values that were encrypted to a key in that + # recipient's possession. It is therefore normal to only be able + # to decrypt one of the per-recipient JWE Encrypted Key values to + # obtain the CEK value. Also, see Section 11.5 for security + # considerations on mitigating timing attacks. + if alg == ALGORITHMS.DIR: + # When Direct Key Agreement or Direct Encryption are employed, + # verify that the JWE Encrypted Key value is an empty octet + # sequence. + + # Record whether the CEK could be successfully determined for this + # recipient or not. + cek_valid = encrypted_key == b"" + + # When Direct Encryption is employed, let the CEK be the shared + # symmetric key. + cek_bytes = _get_key_bytes_from_key(key) + else: + try: + cek_bytes = key.unwrap_key(encrypted_key) + + # Record whether the CEK could be successfully determined for this + # recipient or not. + cek_valid = True + except NotImplementedError: + raise JWEError("alg {} is not implemented".format(alg)) + except Exception: + # Record whether the CEK could be successfully determined for this + # recipient or not. + cek_valid = False + + # To mitigate the attacks described in RFC 3218 [RFC3218], the + # recipient MUST NOT distinguish between format, padding, and length + # errors of encrypted keys. It is strongly recommended, in the event + # of receiving an improperly formatted key, that the recipient + # substitute a randomly generated CEK and proceed to the next step, to + # mitigate timing attacks. + cek_bytes = _get_random_cek_bytes_for_enc(enc) + + # Compute the Encoded Protected Header value BASE64URL(UTF8(JWE + # Protected Header)). If the JWE Protected Header is not present + # (which can only happen when using the JWE JSON Serialization and + # no "protected" member is present), let this value be the empty + # string. + protected_header = encoded_header + + # Let the Additional Authenticated Data encryption parameter be + # ASCII(Encoded Protected Header). However, if a JWE AAD value is + # present (which can only be the case when using the JWE JSON + # Serialization), instead let the Additional Authenticated Data + # encryption parameter be ASCII(Encoded Protected Header || '.' || + # BASE64URL(JWE AAD)). + aad = protected_header + + # Decrypt the JWE Ciphertext using the CEK, the JWE Initialization + # Vector, the Additional Authenticated Data value, and the JWE + # Authentication Tag (which is the Authentication Tag input to the + # calculation) using the specified content encryption algorithm, + # returning the decrypted plaintext and validating the JWE + # Authentication Tag in the manner specified for the algorithm, + # rejecting the input without emitting any decrypted output if the + # JWE Authentication Tag is incorrect. + try: + plain_text = _decrypt_and_auth(cek_bytes, enc, cipher_text, iv, aad, auth_tag) + except NotImplementedError: + raise JWEError("enc {} is not implemented".format(enc)) + except Exception as e: + raise JWEError(e) + + # If a "zip" parameter was included, uncompress the decrypted + # plaintext using the specified compression algorithm. + if plain_text is not None: + plain_text = _decompress(header.get("zip"), plain_text) + + return plain_text if cek_valid else None + + +def get_unverified_header(jwe_str): + """Returns the decoded headers without verification of any kind. + + Args: + jwe_str (str): A compact serialized JWE to decode the headers from. + + Returns: + dict: The dict representation of the JWE headers. + + Raises: + JWEError: If there is an exception decoding the JWE. + """ + header = _jwe_compact_deserialize(jwe_str)[0] + return header + + +def _decrypt_and_auth(cek_bytes, enc, cipher_text, iv, aad, auth_tag): + """ + Decrypt and verify the data + + Args: + cek_bytes (bytes): cek to derive encryption and possible auth key to + verify the auth tag + cipher_text (bytes): Encrypted data + iv (bytes): Initialization vector (iv) used to encrypt data + aad (bytes): Additional Authenticated Data used to verify the data + auth_tag (bytes): Authentication ntag to verify the data + + Returns: + (bytes): Decrypted data + """ + # Decrypt the JWE Ciphertext using the CEK, the JWE Initialization + # Vector, the Additional Authenticated Data value, and the JWE + # Authentication Tag (which is the Authentication Tag input to the + # calculation) using the specified content encryption algorithm, + # returning the decrypted plaintext + # and validating the JWE + # Authentication Tag in the manner specified for the algorithm, + if enc in ALGORITHMS.HMAC_AUTH_TAG: + encryption_key, mac_key, key_len = _get_encryption_key_mac_key_and_key_length_from_cek(cek_bytes, enc) + auth_tag_check = _auth_tag(cipher_text, iv, aad, mac_key, key_len) + elif enc in ALGORITHMS.GCM: + encryption_key = jwk.construct(cek_bytes, enc) + auth_tag_check = auth_tag # GCM check auth on decrypt + else: + raise NotImplementedError("enc {} is not implemented!".format(enc)) + + plaintext = encryption_key.decrypt(cipher_text, iv, aad, auth_tag) + if auth_tag != auth_tag_check: + raise JWEError("Invalid JWE Auth Tag") + + return plaintext + + +def _get_encryption_key_mac_key_and_key_length_from_cek(cek_bytes, enc): + derived_key_len = len(cek_bytes) // 2 + mac_key_bytes = cek_bytes[0:derived_key_len] + mac_key = _get_hmac_key(enc, mac_key_bytes) + encryption_key_bytes = cek_bytes[-derived_key_len:] + encryption_alg, _ = enc.split("-") + encryption_key = jwk.construct(encryption_key_bytes, encryption_alg) + return encryption_key, mac_key, derived_key_len + + +def _jwe_compact_deserialize(jwe_bytes): + """ + Deserialize and verify the header and segments are appropriate. + + Args: + jwe_bytes (bytes): The compact serialized JWE + Returns: + (dict, bytes, bytes, bytes, bytes, bytes) + """ + + # Base64url decode the encoded representations of the JWE + # Protected Header, the JWE Encrypted Key, the JWE Initialization + # Vector, the JWE Ciphertext, the JWE Authentication Tag, and the + # JWE AAD, following the restriction that no line breaks, + # whitespace, or other additional characters have been used. + jwe_bytes = six.ensure_binary(jwe_bytes) + try: + header_segment, encrypted_key_segment, iv_segment, \ + cipher_text_segment, auth_tag_segment = jwe_bytes.split(b'.', 4) + header_data = base64url_decode(header_segment) + except ValueError: + raise JWEParseError('Not enough segments') + except (TypeError, binascii.Error): + raise JWEParseError('Invalid header') + + # Verify that the octet sequence resulting from decoding the + # encoded JWE Protected Header is a UTF-8-encoded representation + # of a completely valid JSON object conforming to RFC 7159 + # [RFC7159]; let the JWE Protected Header be this JSON object. + # + # If using the JWE Compact Serialization, let the JOSE Header be + # the JWE Protected Header. Otherwise, when using the JWE JSON + # Serialization, let the JOSE Header be the union of the members + # of the JWE Protected Header, the JWE Shared Unprotected Header + # and the corresponding JWE Per-Recipient Unprotected Header, all + # of which must be completely valid JSON objects. During this + # step, verify that the resulting JOSE Header does not contain + # duplicate Header Parameter names. When using the JWE JSON + # Serialization, this restriction includes that the same Header + # Parameter name also MUST NOT occur in distinct JSON object + # values that together comprise the JOSE Header. + + try: + header = json.loads(six.ensure_str(header_data)) + except ValueError as e: + raise JWEParseError('Invalid header string: %s' % e) + + if not isinstance(header, Mapping): + raise JWEParseError('Invalid header string: must be a json object') + + try: + encrypted_key = base64url_decode(encrypted_key_segment) + except (TypeError, binascii.Error): + raise JWEParseError('Invalid encrypted key') + + try: + iv = base64url_decode(iv_segment) + except (TypeError, binascii.Error): + raise JWEParseError('Invalid IV') + + try: + ciphertext = base64url_decode(cipher_text_segment) + except (TypeError, binascii.Error): + raise JWEParseError('Invalid cyphertext') + + try: + auth_tag = base64url_decode(auth_tag_segment) + except (TypeError, binascii.Error): + raise JWEParseError('Invalid auth tag') + + return header, header_segment, encrypted_key, iv, ciphertext, auth_tag + + +def _encoded_header(alg, enc, zip, cty, kid): + """ + Generate an appropriate JOSE header based on the values provided + Args: + alg (str): Key wrap/negotiation algorithm + enc (str): Encryption algorithm + zip (str): Compression method + cty (str): Content type of the encrypted data + kid (str): ID for the key used for the operation + + Returns: + bytes: JSON object of header based on input + """ + header = {"alg": alg, "enc": enc} + if zip: + header["zip"] = zip + if cty: + header["cty"] = cty + if kid: + header["kid"] = kid + json_header = json.dumps( + header, + separators=(',', ':'), + sort_keys=True, + ).encode('utf-8') + return base64url_encode(json_header) + + +def _big_endian(int_val): + return pack("!Q", int_val) + + +def _encrypt_and_auth(key, alg, enc, zip, plaintext, aad): + """ + Generate a content encryption key (cek) and initialization + vector (iv) based on enc and alg, compress the plaintext based on zip, + encrypt the compressed plaintext using the cek and iv based on enc + + Args: + key (Key): The key provided for encryption + alg (str): The algorithm use for key wrap/negotiation + enc (str): The encryption algorithm with which to encrypt the plaintext + zip (str): The compression algorithm with which to compress the plaintext + plaintext (bytes): The data to encrypt + aad (str): Additional authentication data utilized for generating an + auth tag + + Returns: + (bytes, bytes, bytes, bytes): A tuple of the following data + (key wrapped cek, iv, cipher text, auth tag) + """ + try: + cek_bytes, kw_cek = _get_cek(enc, alg, key) + except NotImplementedError: + raise JWEError("alg {} is not implemented".format(alg)) + + if enc in ALGORITHMS.HMAC_AUTH_TAG: + encryption_key, mac_key, key_len = _get_encryption_key_mac_key_and_key_length_from_cek(cek_bytes, enc) + iv, ciphertext, tag = encryption_key.encrypt(plaintext, aad) + auth_tag = _auth_tag(ciphertext, iv, aad, mac_key, key_len) + elif enc in ALGORITHMS.GCM: + encryption_key = jwk.construct(cek_bytes, enc) + iv, ciphertext, auth_tag = encryption_key.encrypt(plaintext, aad) + else: + raise NotImplementedError("enc {} is not implemented!".format(enc)) + + return kw_cek, iv, ciphertext, auth_tag + + +def _get_hmac_key(enc, mac_key_bytes): + """ + Get an HMACKey for the provided encryption algorithm and key bytes + + Args: + enc (str): Encryption algorithm + mac_key_bytes (bytes): vytes for the HMAC key + + Returns: + (HMACKey): The key to perform HMAC actions + """ + _, hash_alg = enc.split("-") + mac_key = jwk.construct(mac_key_bytes, hash_alg) + return mac_key + + +def _compress(zip, plaintext): + """ + Compress the plaintext based on the algorithm supplied + + Args: + zip (str): Compression Algorithm + plaintext (bytes): plaintext to compress + + Returns: + (bytes): Compressed plaintext + """ + if zip not in ZIPS.SUPPORTED: + raise NotImplementedError("ZIP {} is not supported!") + if zip is None: + compressed = plaintext + elif zip == ZIPS.DEF: + compressed = zlib.compress(plaintext) + else: + raise NotImplementedError("ZIP {} is not implemented!") + return compressed + + +def _decompress(zip, compressed): + """ + Decompress the plaintext based on the algorithm supplied + + Args: + zip (str): Compression Algorithm + plaintext (bytes): plaintext to decompress + + Returns: + (bytes): Compressed plaintext + """ + if zip not in ZIPS.SUPPORTED: + raise NotImplementedError("ZIP {} is not supported!") + if zip is None: + decompressed = compressed + elif zip == ZIPS.DEF: + decompressed = zlib.decompress(compressed) + else: + raise NotImplementedError("ZIP {} is not implemented!") + return decompressed + + +def _get_cek(enc, alg, key): + """ + Get the content encryption key + + Args: + enc (str): Encryption algorithm + alg (str): kwy wrap/negotiation algorithm + key (Key): Key provided to encryption method + + Return: + (bytes, bytes): Tuple of (cek bytes and wrapped cek) + """ + if alg == ALGORITHMS.DIR: + cek, wrapped_cek = _get_direct_key_wrap_cek(key) + else: + cek, wrapped_cek = _get_key_wrap_cek(enc, key) + + return cek, wrapped_cek + + +def _get_direct_key_wrap_cek(key): + """ + Get the cek and wrapped cek from the encryption key direct + + Args: + key (Key): Key provided to encryption method + + Return: + (Key, bytes): Tuple of (cek Key object and wrapped cek) + """ + # Get the JWK data to determine how to derive the cek + jwk_data = key.to_dict() + if jwk_data["kty"] == "oct": + # Get the last half of an octal key as the cek + cek_bytes = _get_key_bytes_from_key(key) + wrapped_cek = b"" + else: + raise NotImplementedError( + "JWK type {} not supported!".format(jwk_data['kty'])) + return cek_bytes, wrapped_cek + + +def _get_key_bytes_from_key(key): + """ + Get the raw key bytes from a Key object + + Args: + key (Key): Key from which to extract the raw key bytes + Returns: + (bytes) key data + """ + jwk_data = key.to_dict() + encoded_key = jwk_data["k"] + cek_bytes = base64url_decode(encoded_key) + return cek_bytes + + +def _get_key_wrap_cek(enc, key): + """_get_rsa_key_wrap_cek + Get the content encryption key for RSA key wrap + + Args: + enc (str): Encryption algorithm + key (Key): Key provided to encryption method + + Returns: + (Key, bytes): Tuple of (cek Key object and wrapped cek) + """ + cek_bytes = _get_random_cek_bytes_for_enc(enc) + wrapped_cek = key.wrap_key(cek_bytes) + return cek_bytes, wrapped_cek + + +def _get_random_cek_bytes_for_enc(enc): + """ + Get the random cek bytes based on the encryptionn algorithm + + Args: + enc (str): Encryption algorithm + + Returns: + (bytes) random bytes for cek key + """ + if enc == ALGORITHMS.A128GCM: + num_bits = 128 + elif enc == ALGORITHMS.A192GCM: + num_bits = 192 + elif enc in (ALGORITHMS.A128CBC_HS256, ALGORITHMS.A256GCM): + num_bits = 256 + elif enc == ALGORITHMS.A192CBC_HS384: + num_bits = 384 + elif enc == ALGORITHMS.A256CBC_HS512: + num_bits = 512 + else: + raise NotImplementedError("{} not supported".format(enc)) + cek_bytes = get_random_bytes(num_bits // 8) + return cek_bytes + + +def _auth_tag(ciphertext, iv, aad, mac_key, tag_length): + """ + Get ann auth tag from the provided data + + Args: + ciphertext (bytes): Encrypted value + iv (bytes): Initialization vector + aad (bytes): Additional Authenticated Data + mac_key (bytes): Key to use in generating the MAC + tag_length (int): How log the tag should be + + Returns: + (bytes) Auth tag + """ + al = _big_endian(len(aad) * 8) + auth_tag_input = aad + iv + ciphertext + al + signature = mac_key.sign(auth_tag_input) + auth_tag = signature[0:tag_length] + return auth_tag + + +def _jwe_compact_serialize(encoded_header, encrypted_cek, iv, cipher_text, auth_tag): + """ + Generate a compact serialized JWE + + Args: + encoded_header (bytes): Base64 URL Encoded JWE header JSON + encrypted_cek (bytes): Encrypted content encryption key (cek) + iv (bytes): Initialization vector (IV) + cipher_text (bytes): Cipher text + auth_tag (bytes): JWE Auth Tag + + Returns: + (str): JWE compact serialized string + """ + cipher_text = six.ensure_binary(cipher_text) # Maker sure it's bytes + encoded_encrypted_cek = base64url_encode(encrypted_cek) + encoded_iv = base64url_encode(iv) + encoded_cipher_text = base64url_encode(cipher_text) + encoded_auth_tag = base64url_encode(auth_tag) + return encoded_header + b"." + encoded_encrypted_cek + b"." + \ + encoded_iv + b"." + encoded_cipher_text + b"." + encoded_auth_tag diff --git a/jose/jwk.py b/jose/jwk.py index b2c1113a..aea8761b 100644 --- a/jose/jwk.py +++ b/jose/jwk.py @@ -1,12 +1,6 @@ - -import hashlib -import hmac -import six - +from jose.backends.base import Key from jose.constants import ALGORITHMS from jose.exceptions import JWKError -from jose.utils import base64url_decode, base64url_encode -from jose.backends.base import Key try: from jose.backends import RSAKey # noqa: F401 @@ -18,11 +12,26 @@ except ImportError: pass +try: + from jose.backends import AESKey # noqa: F401 +except ImportError: + pass + +try: + from jose.backends import DIRKey # noqa: F401 +except ImportError: + pass + +try: + from jose.backends import HMACKey # noqa: F401 +except ImportError: + pass + def get_key(algorithm): if algorithm in ALGORITHMS.KEYS: return ALGORITHMS.KEYS[algorithm] - elif algorithm in ALGORITHMS.HMAC: + elif algorithm in ALGORITHMS.HMAC: # noqa: F811 return HMACKey elif algorithm in ALGORITHMS.RSA: from jose.backends import RSAKey # noqa: F811 @@ -30,6 +39,12 @@ def get_key(algorithm): elif algorithm in ALGORITHMS.EC: from jose.backends import ECKey # noqa: F811 return ECKey + elif algorithm in ALGORITHMS.AES: + from jose.backends import AESKey # noqa: F811 + return AESKey + elif algorithm == ALGORITHMS.DIR: + from jose.backends import DIRKey # noqa: F811 + return DIRKey return None @@ -58,84 +73,3 @@ def construct(key_data, algorithm=None): if not key_class: raise JWKError('Unable to find an algorithm for key: %s' % key_data) return key_class(key_data, algorithm) - - -def get_algorithm_object(algorithm): - algorithms = { - ALGORITHMS.HS256: 'SHA256', - ALGORITHMS.HS384: 'SHA384', - ALGORITHMS.HS512: 'SHA512', - ALGORITHMS.RS256: 'SHA256', - ALGORITHMS.RS384: 'SHA384', - ALGORITHMS.RS512: 'SHA512', - ALGORITHMS.ES256: 'SHA256', - ALGORITHMS.ES384: 'SHA384', - ALGORITHMS.ES512: 'SHA512', - } - key = get_key(algorithm) - attr = algorithms.get(algorithm, None) - return getattr(key, attr) - - -class HMACKey(Key): - """ - Performs signing and verification operations using HMAC - and the specified hash function. - """ - SHA256 = hashlib.sha256 - SHA384 = hashlib.sha384 - SHA512 = hashlib.sha512 - - def __init__(self, key, algorithm): - if algorithm not in ALGORITHMS.HMAC: - raise JWKError('hash_alg: %s is not a valid hash algorithm' % algorithm) - self._algorithm = algorithm - self.hash_alg = get_algorithm_object(algorithm) - - if isinstance(key, dict): - self.prepared_key = self._process_jwk(key) - return - - if not isinstance(key, six.string_types) and not isinstance(key, bytes): - raise JWKError('Expecting a string- or bytes-formatted key.') - - if isinstance(key, six.text_type): - key = key.encode('utf-8') - - invalid_strings = [ - b'-----BEGIN PUBLIC KEY-----', - b'-----BEGIN RSA PUBLIC KEY-----', - b'-----BEGIN CERTIFICATE-----', - b'ssh-rsa' - ] - - if any(string_value in key for string_value in invalid_strings): - raise JWKError( - 'The specified key is an asymmetric key or x509 certificate and' - ' should not be used as an HMAC secret.') - - self.prepared_key = key - - def _process_jwk(self, jwk_dict): - if not jwk_dict.get('kty') == 'oct': - raise JWKError("Incorrect key type. Expected: 'oct', Received: %s" % jwk_dict.get('kty')) - - k = jwk_dict.get('k') - k = k.encode('utf-8') - k = bytes(k) - k = base64url_decode(k) - - return k - - def sign(self, msg): - return hmac.new(self.prepared_key, msg, self.hash_alg).digest() - - def verify(self, msg, sig): - return hmac.compare_digest(sig, self.sign(msg)) - - def to_dict(self): - return { - 'alg': self._algorithm, - 'kty': 'oct', - 'k': base64url_encode(self.prepared_key).decode('ASCII'), - } diff --git a/jose/utils.py b/jose/utils.py index e859f4c2..b1aa2ba9 100644 --- a/jose/utils.py +++ b/jose/utils.py @@ -1,9 +1,10 @@ import base64 -import six import struct import sys +import six + if sys.version_info > (3,): # Deal with integer compatibilities between Python 2 and 3. # Using `from builtins import int` is not supported on AppEngine. diff --git a/requirements-dev.txt b/requirements-dev.txt index 067f9d0a..6a6b7ac7 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -6,7 +6,7 @@ cryptography==2.4.2 docopt==0.6.2 nose==1.3.6 py==1.5.4 -pytest==4.1.1 +pytest==4.6.10 pytest-cov==2.6.1 # wsgiref is included in python standard library in Python 3, and will fail to install. wsgiref==0.1.2; python_version < "3.0" diff --git a/setup.py b/setup.py index eb4f8fda..7e7afbaf 100644 --- a/setup.py +++ b/setup.py @@ -66,6 +66,8 @@ def _cryptography_version(): 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: Implementation :: PyPy', 'Topic :: Utilities', ], @@ -76,7 +78,7 @@ def _cryptography_version(): ], tests_require=[ 'six', - 'ecdsa', + 'ecdsa<0.15', 'pytest', 'pytest-cov', 'pytest-runner', diff --git a/tests/algorithms/test_AES.py b/tests/algorithms/test_AES.py new file mode 100644 index 00000000..dba92e69 --- /dev/null +++ b/tests/algorithms/test_AES.py @@ -0,0 +1,75 @@ +from binascii import unhexlify, hexlify + +import pytest +import six + +from jose.constants import ALGORITHMS + +try: + from jose.backends.pycrypto_backend import AESKey as PyCryptoAESKey +except ImportError as e: + PyCryptoAESKey = None + +try: + from jose.backends.cryptography_backend import CryptographyAESKey +except ImportError as e: + CryptographyAESKey = None + + +# List of Tuple of (alg, key, kek, wrapped) obtained from +# https://tools.ietf.org/html/rfc3394#section-2.2.3.1 +VECTORS = ( + (ALGORITHMS.A128KW, + six.ensure_binary("00112233445566778899AABBCCDDEEFF"), + six.ensure_binary("000102030405060708090A0B0C0D0E0F"), + six.ensure_binary("1FA68B0A8112B447AEF34BD8FB5A7B829D3E862371D2CFE5")), + (ALGORITHMS.A192KW, + six.ensure_binary("00112233445566778899AABBCCDDEEFF0001020304050607"), + six.ensure_binary("000102030405060708090A0B0C0D0E0F1011121314151617"), + six.ensure_binary("031D33264E15D33268F24EC260743EDCE1C6C7DDEE725A936BA814915C6762D2")), + (ALGORITHMS.A256KW, + six.ensure_binary("00112233445566778899AABBCCDDEEFF000102030405060708090A0B0C0D0E0F"), + six.ensure_binary("000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F"), + six.ensure_binary("28C9F404C4B810F4CBCCB35CFB87F8263F5786E2D80ED326CBC7F0E71A99F43BFB988B9B7A02DD21")), +) + + +@pytest.mark.cryptography +@pytest.mark.skipif(PyCryptoAESKey is None, reason="Cryptography backend not available") +class TestCryptographyAesKeywrap(): + @pytest.mark.parametrize("alg,hex_key,hex_kek,expected", VECTORS) + def test_wrap(self, alg, hex_key, hex_kek, expected): + bin_key = unhexlify(hex_key) + bin_kek = unhexlify(hex_kek) + aes_key = CryptographyAESKey(bin_kek, alg) + actual = hexlify(aes_key.wrap_key(bin_key)).upper() + assert actual == expected + + @pytest.mark.parametrize("alg,expected,hex_kek,hex_wrapped", VECTORS) + def test_unwrap(self, alg, expected, hex_kek, hex_wrapped): + bin_kek = unhexlify(hex_kek) + bin_wrapped = unhexlify(hex_wrapped) + aes_key = CryptographyAESKey(bin_kek, alg) + actual = hexlify(aes_key.unwrap_key(bin_wrapped)).upper() + assert actual == expected + + +@pytest.mark.pycrypto +@pytest.mark.pycryptodome +@pytest.mark.skipif(PyCryptoAESKey is None, reason="Pycrypto/dome backend not available") +class TestPycryptoAesKeywrap(): + @pytest.mark.parametrize("alg,hex_key,hex_kek,expected", VECTORS) + def test_wrap(self, alg, hex_key, hex_kek, expected): + bin_key = unhexlify(hex_key) + bin_kek = unhexlify(hex_kek) + aes_key = PyCryptoAESKey(bin_kek, alg) + actual = hexlify(aes_key.wrap_key(bin_key)).upper() + assert actual == expected + + @pytest.mark.parametrize("alg,expected,hex_kek,hex_wrapped", VECTORS) + def test_unwrap(self, alg, expected, hex_kek, hex_wrapped): + bin_kek = unhexlify(hex_kek) + bin_wrapped = unhexlify(hex_wrapped) + aes_key = PyCryptoAESKey(bin_kek, alg) + actual = hexlify(aes_key.unwrap_key(bin_wrapped)).upper() + assert actual == expected diff --git a/tests/algorithms/test_AES_compat.py b/tests/algorithms/test_AES_compat.py new file mode 100644 index 00000000..9bec51cd --- /dev/null +++ b/tests/algorithms/test_AES_compat.py @@ -0,0 +1,70 @@ +import pytest + +try: + from jose.backends.pycrypto_backend import AESKey as PyCryptoAESKey + from jose.backends.cryptography_backend import CryptographyAESKey +except ImportError: + PyCryptoAESKey = CryptographyAESKey = None +from jose.exceptions import JWEError +from jose.constants import ALGORITHMS + +CRYPTO_BACKENDS = ( + pytest.param(CryptographyAESKey, id="pyca/cryptography"), + pytest.param(PyCryptoAESKey, id="pycrypto/dome"), +) + + +@pytest.mark.backend_compatibility +@pytest.mark.skipif( + None in (CryptographyAESKey, PyCryptoAESKey), + reason="Multiple crypto backends not available for backend compatibility tests" +) +class TestBackendAesCompatibility(object): + @pytest.mark.parametrize("backend_decrypt", CRYPTO_BACKENDS) + @pytest.mark.parametrize("backend_encrypt", CRYPTO_BACKENDS) + @pytest.mark.parametrize("algorithm", ALGORITHMS.AES_PSEUDO) + def test_encryption_parity(self, backend_encrypt, backend_decrypt, algorithm): + if "128" in algorithm: + key = b"8slRzzty6dKMaFCP" + elif "192" in algorithm: + key = b"8slRzzty6dKMaFCP8slRzzty" + else: + key = b"8slRzzty6dKMaFCP8slRzzty6dKMaFCP" + + key_encrypt = backend_encrypt(key, algorithm) + key_decrypt = backend_decrypt(key, algorithm) + plain_text = b"test" + aad = b"extra data" if "GCM" in algorithm else None + + iv, cipher_text, tag = key_encrypt.encrypt(plain_text, aad) + + # verify decrypt to original plain text + actual = key_decrypt.decrypt(cipher_text, iv, aad, tag) + assert actual == plain_text + + with pytest.raises(JWEError): + key_decrypt.decrypt(b'n' * 64) + + @pytest.mark.parametrize("backend_key_wrap", CRYPTO_BACKENDS) + @pytest.mark.parametrize("backend_key_unwrap", CRYPTO_BACKENDS) + @pytest.mark.parametrize("algorithm", ALGORITHMS.AES_KW) + def test_wrap_parity(self, backend_key_wrap, backend_key_unwrap, algorithm): + if "128" in algorithm: + key = b"8slRzzty6dKMaFCP" + elif "192" in algorithm: + key = b"8slRzzty6dKMaFCP8slRzzty" + else: + key = b"8slRzzty6dKMaFCP8slRzzty6dKMaFCP" + + key_wrap = backend_key_wrap(key, algorithm) + key_unwrap = backend_key_unwrap(key, algorithm) + plain_text = b"sixteen byte key" + + wrapped_key = key_wrap.wrap_key(plain_text) + + # verify unwrap_key to original plain text + actual = key_unwrap.unwrap_key(wrapped_key) + assert actual == plain_text + + with pytest.raises(JWEError): + key_unwrap.decrypt(b'n' * 64) diff --git a/tests/algorithms/test_HMAC.py b/tests/algorithms/test_HMAC.py index 843d3a28..e6f5445e 100644 --- a/tests/algorithms/test_HMAC.py +++ b/tests/algorithms/test_HMAC.py @@ -2,7 +2,7 @@ from jose.constants import ALGORITHMS from jose.exceptions import JOSEError -from jose.jwk import HMACKey +from jose.backends.native import HMACKey import pytest diff --git a/tests/algorithms/test_HMAC_compat.py b/tests/algorithms/test_HMAC_compat.py new file mode 100644 index 00000000..7ffcef0d --- /dev/null +++ b/tests/algorithms/test_HMAC_compat.py @@ -0,0 +1,47 @@ +import pytest + +try: + from jose.backends.cryptography_backend import CryptographyHMACKey +except ImportError: + CryptographyHMACKey = None + +from jose.backends.native import HMACKey +from jose.constants import ALGORITHMS + +CRYPTO_BACKENDS = ( + pytest.param(CryptographyHMACKey, id="pyca/cryptography"), + pytest.param(HMACKey, id="native"), +) + +SUPPORTED_ALGORITHMS = ALGORITHMS.HMAC + + +@pytest.mark.backend_compatibility +@pytest.mark.skipif( + CryptographyHMACKey is None, + reason="Multiple crypto backends not available for backend compatibility tests" +) +class TestBackendAesCompatibility(object): + @pytest.mark.parametrize("backend_sign", CRYPTO_BACKENDS) + @pytest.mark.parametrize("backend_verify", CRYPTO_BACKENDS) + @pytest.mark.parametrize("algorithm", SUPPORTED_ALGORITHMS) + def test_encryption_parity(self, backend_sign, backend_verify, algorithm): + if "128" in algorithm: + key = b"8slRzzty6dKMaFCP" + elif "192" in algorithm: + key = b"8slRzzty6dKMaFCP8slRzzty" + else: + key = b"8slRzzty6dKMaFCP8slRzzty6dKMaFCP" + + key_sign = backend_sign(key, algorithm) + key_verify = backend_verify(key, algorithm) + + message = b"test" + + digest = key_sign.sign(message) + + assert key_verify.verify(message, digest) + + assert not key_verify.verify(b"not the message", digest) + + assert not key_verify.verify(digest, b"not the digest") diff --git a/tests/algorithms/test_RSA.py b/tests/algorithms/test_RSA.py index cdcb1dae..39635520 100644 --- a/tests/algorithms/test_RSA.py +++ b/tests/algorithms/test_RSA.py @@ -169,6 +169,11 @@ -----END PRIVATE KEY----- """ +RSA_KW_ALGOS = ( + pytest.param(ALGORITHMS.RSA_OAEP, id="RSA_OAEP"), + pytest.param(ALGORITHMS.RSA_OAEP_256, id="RSA_OAEP_256") +) + def _legacy_invalid_private_key_pkcs8_der(): legacy_key = LEGACY_INVALID_PRIVATE_KEY_PKCS8_PEM.strip() @@ -235,7 +240,7 @@ def test_pycrypto_RSA_key_instance(): @pytest.mark.pycryptodome @pytest.mark.parametrize("private_key", PRIVATE_KEYS) @pytest.mark.skipif(None in (PyCryptoRSA, PyCryptoRSAKey), reason="Pycrypto/dome backend not available") -def test_pycrypto_unencoded_cleartext(private_key): +def test_pycrypto_sign_unencoded_cleartext(private_key): key = PyCryptoRSAKey(private_key, ALGORITHMS.RS256) msg = b'test' signature = key.sign(msg) @@ -245,6 +250,22 @@ def test_pycrypto_unencoded_cleartext(private_key): assert not bool(public_key.verify(msg, 1)) +# TODO: Unclear why this test was marked as only for pycrypto +@pytest.mark.pycrypto +@pytest.mark.pycryptodome +@pytest.mark.parametrize("private_key_pem", PRIVATE_KEYS) +@pytest.mark.parametrize("algorithm", RSA_KW_ALGOS) +@pytest.mark.skipif(None in (PyCryptoRSA, PyCryptoRSAKey), + reason="Pycrypto/dome backend not available") +def test_pycrypto_wrap_key_unencoded_cleartext(private_key_pem, algorithm): + private_key = PyCryptoRSAKey(private_key_pem, algorithm) + key = b'test' + public_key = private_key.public_key() + wrapped = public_key.wrap_key(key) + unwrapped = private_key.unwrap_key(wrapped) + assert unwrapped == key + + @pytest.mark.cryptography @pytest.mark.skipif( None in (default_backend, pyca_rsa, CryptographyRSAKey), @@ -264,6 +285,22 @@ def test_cryptography_RSA_key_instance(): assert pem.startswith(b'-----BEGIN PUBLIC KEY-----') +@pytest.mark.cryptography +@pytest.mark.parametrize("private_key_pem", PRIVATE_KEYS) +@pytest.mark.parametrize("algorithm", RSA_KW_ALGOS) +@pytest.mark.skipif( + None in (default_backend, pyca_rsa, CryptographyRSAKey), + reason="Cryptography backend not available" +) +def test_cryptography_wrap_key_unencoded_cleartext(private_key_pem, algorithm): + private_key = CryptographyRSAKey(private_key_pem, algorithm) + key = b'test' + public_key = private_key.public_key() + wrapped = public_key.wrap_key(key) + unwrapped = private_key.unwrap_key(wrapped) + assert unwrapped == key + + class TestRSAAlgorithm: def test_RSA_key(self): assert not RSAKey(private_key_4096_pkcs1, ALGORITHMS.RS256).is_public() diff --git a/tests/algorithms/test_RSA_compat.py b/tests/algorithms/test_RSA_compat.py index 0da03d50..a4f99b05 100644 --- a/tests/algorithms/test_RSA_compat.py +++ b/tests/algorithms/test_RSA_compat.py @@ -7,6 +7,7 @@ except ImportError: PurePythonRSAKey = CryptographyRSAKey = PyCryptoRSAKey = None from jose.constants import ALGORITHMS +from jose.exceptions import JWEError from .test_RSA import PRIVATE_KEYS @@ -100,3 +101,24 @@ def test_private_key_load_cycle(self, BackendFrom, BackendTo, encoding_save, enc key_2 = BackendTo(pem_load, ALGORITHMS.RS256) assert pem_reference == key_2.to_pem(encoding_save).strip() + + @pytest.mark.parametrize("backend_wrap", CRYPTO_BACKENDS) + @pytest.mark.parametrize("backend_unwrap", CRYPTO_BACKENDS) + @pytest.mark.parametrize("algorithm", filter(lambda x: x in ALGORITHMS.SUPPORTED, ALGORITHMS.RSA_KW)) + @pytest.mark.parametrize("private_key", PRIVATE_KEYS) + def test_key_wrap_parity(self, backend_wrap, backend_unwrap, private_key, algorithm): + if algorithm in (ALGORITHMS.RSA_OAEP, ALGORITHMS.RSA_OAEP_256) \ + and PurePythonRSAKey in (backend_wrap, backend_unwrap): + pytest.skip("Pure RSA does not support OAEP") + key_wrap = backend_wrap(private_key, algorithm).public_key() + key_unwrap = backend_unwrap(private_key, algorithm) + + unwrapped_key = b'test' + wrapped_key = key_wrap.wrap_key(unwrapped_key) + + # verify unwrap to original key + actual = key_unwrap.unwrap_key(wrapped_key) + assert actual == unwrapped_key + + with pytest.raises(JWEError): + key_unwrap.unwrap_key(b'n' * 64) diff --git a/tests/algorithms/test_base.py b/tests/algorithms/test_base.py index 6f7fcb75..7e20017d 100644 --- a/tests/algorithms/test_base.py +++ b/tests/algorithms/test_base.py @@ -17,3 +17,19 @@ def test_sign_is_interface(self, alg): def test_verify_is_interface(self, alg): with pytest.raises(NotImplementedError): alg.verify('msg', 'sig') + + def test_encrypt_is_interface(self, alg): + with pytest.raises(NotImplementedError): + alg.encrypt('plain text', ) + + def test_decrypt_is_interface(self, alg): + with pytest.raises(NotImplementedError): + alg.decrypt('plain text', iv='iv') + + def test_wrap_key_is_interface(self, alg): + with pytest.raises(NotImplementedError): + alg.wrap_key('plain text') + + def test_unwrap_key_is_interface(self, alg): + with pytest.raises(NotImplementedError): + alg.unwrap_key('plain text') diff --git a/tests/test_backends.py b/tests/test_backends.py index 6e633a19..72de7dd9 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -16,7 +16,27 @@ except ImportError: PurePythonRSAKey = None -from jose.backends import ECKey, RSAKey +try: + from jose.backends.cryptography_backend import CryptographyAESKey +except ImportError: + CryptographyAESKey = None +try: + from jose.backends.pycrypto_backend import AESKey as PyCryptoAESKey +except ImportError: + PyCryptoAESKey = None +try: + from jose.backends.cryptography_backend import CryptographyHMACKey +except ImportError: + CryptographyHMACKey = None + +from jose.backends.native import HMACKey as NativeHMACKey + +from jose.backends import ECKey, RSAKey, HMACKey + +try: + from jose.backends import AESKey +except ImportError: + AESKey = None def test_default_ec_backend(): @@ -33,3 +53,19 @@ def test_default_rsa_backend(): assert RSAKey is PyCryptoRSAKey else: assert RSAKey is PurePythonRSAKey + + +def test_default_aes_backend(): + if CryptographyAESKey is not None: + assert AESKey is CryptographyAESKey + elif PyCryptoAESKey is not None: + assert AESKey is PyCryptoAESKey + else: + assert AESKey is None + + +def test_default_hmac_backend(): + if CryptographyHMACKey is not None: + assert HMACKey is CryptographyHMACKey + else: + assert HMACKey is NativeHMACKey diff --git a/tests/test_jwe.py b/tests/test_jwe.py new file mode 100644 index 00000000..6d3c11fd --- /dev/null +++ b/tests/test_jwe.py @@ -0,0 +1,499 @@ +import json + +import pytest +import six + +import jose.backends +from jose import jwe +from jose.constants import ALGORITHMS, ZIPS +from jose.exceptions import JWEParseError +from jose.jwk import AESKey +from jose.jwk import RSAKey +from jose.utils import base64url_decode + +backends = [] +try: + import jose.backends.cryptography_backend # noqa E402 + backends.append(jose.backends.cryptography_backend) +except ImportError: + pass +try: + import jose.backends.pycrypto_backend # noqa E402 + backends.append(jose.backends.pycrypto_backend) +except ImportError: + pass +import jose.backends.native # noqa E402 + +try: + from jose.backends.rsa_backend import RSAKey as RSABackendRSAKey +except ImportError: + RSABackendRSAKey = None + +backends.append(jose.backends.native) + +PRIVATE_KEY_PEM = """-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEA3AyQGW/Q8AKJH2Mfjv1c67iYcwIn+Z2tpqHDQQV9CfSx9CMs ++Zg2buopXJ7AWd03ZR08g9O2bmlJPIQV1He3vfzZH9+6aJAQLJ+VzpME2sXl5Boa +yla1JjyoH7ix/i02QHDTVClDMb6dy0rMVpc7cBxwgX54fcR5x3AMscYCTQrhQc7q +YRzoLTfP9lGJT1DgyGcOt4paa77z4uqqaQxQ4QqxM9in3DU0mzVxXigHVakjiS6v +kSNEhSl+VLIp1sHiOhOSpcxWkhTikjm+XpwE5H0L9I1mQ2e2nTvX7uADg/pgFMy0 +uP833rQzTxNqTTPJZFLtLkTyq1Hr2MUeQ3dRNQIDAQABAoIBAFK9+pVGAVeubGc7 ++4rl5EHSqKheQC/RRZGps+TILotG0n9NlsTHong0XpcwLn3b+89unemn+yorNtml +hRveZF3xLKealdppiVtuKoOBrsqgrWAHHNnGntkg58r9xRghYgv7IMu9tEGJPoZJ +uuo4daYjW36l0qLf9Ta0AGH8ZbMX2LnNO+r4EQmZ1YJShEYOS94WJnFB7XuZ/bQH +AI3IRPkQvXQNq1nnMxhAj91hOhJvTVCS04yVVzMkntcpeNP7pc7ARtSA5IepJvdK +HbcoSQ1aIK/NPkhiDs/KOoWdnB8Mqr3fXFTVJ3/YTJKwODugJ5QCbSyIC8JewgIn +d6mA6iECgYEA7028RNk65c5NRkv6rkveTT1ybrvYUUO/pbAlS4MqZmtx69n4LFrW +qicXw7sJd+O8emyvF3xHPAfVviJKg6yudtI0nM9WUuOgKr+qoKRWJMpspXdpjTXs +AQXrFAJjrDIFujsbnRmT2nbRX8nSBWvI5oSG4JqILWYs0OdchIkPo0kCgYEA62bq +mjnlz7Mqvznf8b9jOSEJKub81aUz/fK62gXcEdvffUdlDecAzotjryI678TvEBpI +w1rmHLND60o+Lczd3quyEPQfYrf8P4/6sqGfE/QtB7zKR1bXmkV0dNlr9h6zpm/Y +BpLNiqr3Ntf4OCkKiD6ch+sZ4NjKBCwzodolUo0CgYEAk/PEzfBcqM5nGmpJX8/K +bojqIiqDcKLpb4A7XreG1HHjqkVGWe4DwImQ+NO/497qnepqSqPsyuGxNe+vkD+I +UjBelQDfxzmywhtkXBOeqvp4N8lfeg33jx5gnCtqAoGe5ug6h2PT9QL3Kjj2X6Gn +QVZ4qY8BWMhONw6ENfEjuPkCgYBP0ps05vMdpgSVyXs9z4dG5QPlz2Pm0lk6AKgJ +rDj+uU8kfSQwPafRYgTQa0wO5/mkvTT1QYqMKuGaFJfXEgQeMJx2EUHfSMI5j4oU +LqfxrTfjysnQvQrpHioqQVvRnoGOq5hWSkt2fRjNORjLemc+4fRURo2E6B5Aofh0 +JrPHNQKBgBGYzDGJyFnu7GYTby18aPNkQYweNDM6aZ/tUN8yZ4ryq7QnodiKLe2b +VxSr8Y+1w4xRjN67PGrS3IpQX9CAoTqyBN7VLhuq/mixOPccmo/5ui3fig/WEYwK ++ox4tfIuhfmskPNS235vLwbNIBkzP3PWVM5Chq1pEnHQUeiZq3U+ +-----END RSA PRIVATE KEY----- +""" + +PUBLIC_KEY_PEM = """-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3AyQGW/Q8AKJH2Mfjv1c +67iYcwIn+Z2tpqHDQQV9CfSx9CMs+Zg2buopXJ7AWd03ZR08g9O2bmlJPIQV1He3 +vfzZH9+6aJAQLJ+VzpME2sXl5Boayla1JjyoH7ix/i02QHDTVClDMb6dy0rMVpc7 +cBxwgX54fcR5x3AMscYCTQrhQc7qYRzoLTfP9lGJT1DgyGcOt4paa77z4uqqaQxQ +4QqxM9in3DU0mzVxXigHVakjiS6vkSNEhSl+VLIp1sHiOhOSpcxWkhTikjm+XpwE +5H0L9I1mQ2e2nTvX7uADg/pgFMy0uP833rQzTxNqTTPJZFLtLkTyq1Hr2MUeQ3dR +NQIDAQAB +-----END PUBLIC KEY----- +""" + +OCT_128_BIT_KEY = b"\x04\xd3\x1f\xc5T\x9d\xfc\xfe\x0bd\x9d\xfa?\xaaj\xce" +OCT_192_BIT_KEY = b"\x04\xd3\x1f\xc5T\x9d\xfc\xfe\x0bd\x9d\xfa?\xaaj\xcek|\xd4-ok\t\xdb" +OCT_256_BIT_KEY = b"\x04\xd3\x1f\xc5T\x9d\xfc\xfe\x0bd\x9d\xfa?\xaaj\xcek|\xd4-ok\t\xdb\xc8\xb1\x00\xf0\x8f\x9c,\xcf" +OCT_384_BIT_KEY = b"\x04\xd3\x1f\xc5T\x9d\xfc\xfe\x0bd\x9d\xfa?\xaaj\xcek|\xd4-ok\t\xdb\xc8\xb1\x00\xf0\x8f\x9c,\xcf\x04\xd3\x1f\xc5T\x9d\xfc\xfe\x0bd\x9d\xfa?\xaaj\xce" +OCT_512_BIT_KEY = b"\x04\xd3\x1f\xc5T\x9d\xfc\xfe\x0bd\x9d\xfa?\xaaj\xcek|\xd4-ok\t\xdb\xc8\xb1\x00\xf0\x8f\x9c,\xcf\x04\xd3\x1f\xc5T\x9d\xfc\xfe\x0bd\x9d\xfa?\xaaj\xcek|\xd4-ok\t\xdb\xc8\xb1\x00\xf0\x8f\x9c,\xcf" + + +class TestGetUnverifiedHeader(object): + + def test_valid_header_and_auth_tag(self): + expected_header = {u"alg": u"RSA1_5", u"enc": u"A128CBC-HS256"} + jwe_str = "eyJhbGciOiJSU0ExXzUiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2In0." \ + "UGhIOguC7IuEvf_NPVaXsGMoLOmwvc1GyqlIKOK1nN94nHPoltGRhWhw7" \ + "Zx0-kFm1NJn8LE9XShH59_i8J0PH5ZZyNfGy2xGdULU7sHNF6Gp2vPLgN" \ + "Z__deLKxGHZ7PcHALUzoOegEI-8E66jX2E4zyJKx-YxzZIItRzC5hlRir" \ + "b6Y5Cl_p-ko3YvkkysZIFNPccxRU7qve1WYPxqbb2Yw8kZqa2rMWI5ng8" \ + "OtvzlV7elprCbuPhcCdZ6XDP0_F8rkXds2vE4X-ncOIM8hAYHHi29NX0m" \ + "cKiRaD0-D-ljQTP-cFPgwCp6X-nZZd9OHBv-B3oWh2TbqmScqXMR4gp_A" \ + "." \ + "AxY8DCtDaGlsbGljb3RoZQ." \ + "KDlTtXchhZTGufMYmOYGS4HffxPSUrfmqCHXaI9wOGY." \ + "9hH0vgRfYgPnAHOd8stkvw" + actual_header = jwe.get_unverified_header(jwe_str) + assert expected_header == actual_header + + def test_invalid_jwe_string_raises_jwe_parse_error(self): + with pytest.raises(JWEParseError): + jwe.get_unverified_header("invalid jwe string") + + def test_non_json_header_section_raises_jwe_parse_error(self): + jwe_str = "not json." \ + "UGhIOguC7IuEvf_NPVaXsGMoLOmwvc1GyqlIKOK1nN94nHPoltGRhWhw7" \ + "Zx0-kFm1NJn8LE9XShH59_i8J0PH5ZZyNfGy2xGdULU7sHNF6Gp2vPLgN" \ + "Z__deLKxGHZ7PcHALUzoOegEI-8E66jX2E4zyJKx-YxzZIItRzC5hlRir" \ + "b6Y5Cl_p-ko3YvkkysZIFNPccxRU7qve1WYPxqbb2Yw8kZqa2rMWI5ng8" \ + "OtvzlV7elprCbuPhcCdZ6XDP0_F8rkXds2vE4X-ncOIM8hAYHHi29NX0m" \ + "cKiRaD0-D-ljQTP-cFPgwCp6X-nZZd9OHBv-B3oWh2TbqmScqXMR4gp_A" \ + "." \ + "AxY8DCtDaGlsbGljb3RoZQ." \ + "KDlTtXchhZTGufMYmOYGS4HffxPSUrfmqCHXaI9wOGY." \ + "9hH0vgRfYgPnAHOd8stkvw" + + with pytest.raises(JWEParseError): + jwe.get_unverified_header(jwe_str) + + def test_wrong_auth_tag_is_ignored(self): + expected_header = {u"alg": u"RSA1_5", u"enc": u"A128CBC-HS256"} + jwe_str = "eyJhbGciOiJSU0ExXzUiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2In0." \ + "UGhIOguC7IuEvf_NPVaXsGMoLOmwvc1GyqlIKOK1nN94nHPoltGRhWhw7" \ + "Zx0-kFm1NJn8LE9XShH59_i8J0PH5ZZyNfGy2xGdULU7sHNF6Gp2vPLgN" \ + "Z__deLKxGHZ7PcHALUzoOegEI-8E66jX2E4zyJKx-YxzZIItRzC5hlRir" \ + "b6Y5Cl_p-ko3YvkkysZIFNPccxRU7qve1WYPxqbb2Yw8kZqa2rMWI5ng8" \ + "OtvzlV7elprCbuPhcCdZ6XDP0_F8rkXds2vE4X-ncOIM8hAYHHi29NX0m" \ + "cKiRaD0-D-ljQTP-cFPgwCp6X-nZZd9OHBv-B3oWh2TbqmScqXMR4gp_A" \ + "." \ + "AxY8DCtDaGlsbGljb3RoZQ." \ + "KDlTtXchhZTGufMYmOYGS4HffxPSUrfmqCHXaI9wOGY." \ + "invalid" + actual_header = jwe.get_unverified_header(jwe_str) + assert expected_header == actual_header + + +@pytest.mark.skipif(AESKey is None, reason="Test requires AES Backend") +@pytest.mark.skipif(RSAKey is RSABackendRSAKey, reason="RSA Backend does not support all modes") +class TestDecrypt(object): + + JWE_RSA_PACKAGES = ( + pytest.param( + b"eyJhbGciOiJSU0ExXzUiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2In0.qHxZy-MfqRjCDAieY5AoU75XRGS7S-Xx4NytHgNa5dmGh9R8q1riHyPw5Hec_D395fKqV75u1hKke5r-jgiDTaCicQjOuxM2cSaiFlUid7dk5zIucaKH84N8jMzq3PwBePmGftePM2NMCzs6RvWBFP5SnDHh95NU2Xd-rIUICA7zIBXTwNRsB2LM9c_TZv1qh59DYoiSHWy94WXJBNFqViuVLmjVz5250J6Q4uRiYKGJKEGkfLDUp18N97aw5RQ35jJF6QyO5JkeLFTA0L10QAEtM8RjBRrKYgJ6fJLCVbHHTf7EKdn6Z-4cIZKtYe2d7PPKa0ZWZvtYTuU1S6DgmA.gdSr6lSIci4GjzMsdLaK6g.4ynh6gGG4dzxpmNfZHo6o8Eqp1eXRhKzI2Tmde-IulU.cFUhLtodRUqZ1GfSO6e3pw", + id="alg: RSA1_5, enc: A128CBC-HS256" + ), + pytest.param( + b"eyJhbGciOiJSU0ExXzUiLCJlbmMiOiJBMTkyQ0JDLUhTMzg0In0.Ju8YCub_jjFt4WR_pOIyeiXLtfwhUl-FMNETu3PMRVV8v6pD2-X4AFNWeA2pAX1_DkUIJEP8J3mjFdZB_ah6wb1ab0je-aSk3d8di8ES93gv_DkwWHkz_cjbm2At3JEh2gO252O3Ychjn8C0gMnLiXJN9Qmg_nF1drpvSdhgFz0FEI-2NlhD-0d8yy0ROMaMEby7aX7ouXP6QI3PKiwFYgPB-dtMzvF2cmZl_g3sLde9l1-U2e8JIpAW8vqQCO8Jswr0B6nH_LjUIBUEWS5vipqTa_v9siaAgLI46T5kEMJhnRVjJHvIkfnFABn5fCCVtgx2VpVrNkcejqvfLjIyNg.qyfq0GH9NgQOjuyEIKRQdA.FUb4QogxGaOslBqaTlcYqGGmhMXS8uTXNY0mpV7VPkQ.gi1jZcKEJoBey_5YBxSFVDnZulAlRPkq", + id="alg: RSA1_5, enc: A192CBC-HS384" + ), + pytest.param( + b"eyJhbGciOiJSU0ExXzUiLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0.2r5K6UQ4a8PDar1lpsLBNnMSwPuffn3vVnI-fbFCBKTzRUSgzWiMYKd9PCBFQIA5D3E8bwQiMY0tgiHNuCZF4PaLJp99SVKkbwp0H5681mFgpQ5c-QtPHMa5fA7_zOt1DRN67XddKTSKLm7_3RQ2twU4rg3DVS-aElZZSV74Rip_KKeoDvaoJBfPY4HPFqiR96dHLdLCoSzks1XzmRxo36cY2wb-4ztWUd2J5-_7ps1khUvffOMFJuox2zk9FYIqHXZQr9eL3n4cdF-M-tFvfjBenUThW97byckr1gyWzHCUOcaVHAP3jp1xubPahtkCpsOGAvqwiO9ahRtY0afhyw.xTKBz19OoA1Av0OfNVPgOg.FCNLcCHaOGBjQSLw8vJ_2K5ROdsm0m8YkKdkSGGzX98.M5fPe-ZDlF9xjS6YELgFS30sllUK_5FZ0vBqmmKCWpY", + id="alg: RSA1_5, enc: A256CBC-HS512" + ), + pytest.param( + b"eyJhbGciOiJSU0ExXzUiLCJlbmMiOiJBMTI4R0NNIn0.QYbUBjDR7tf1NsbLOsVg3oub--eOgcm-a9BWJ3VIlwUWlE6ybdNFY-tgib69bFeDVJUgFipGbjpx99xbsn12F4dIZvDy0S9XWqKZ4GHXCtcButxyxyusQl-Qw0Myfd9OFEDmCnjCcU_Z2UamlsSK5c9OQa9F832bwlsOvufvexAUIoqNI94J6MCzWYn03zNcuKXd2EzbTXWRcxUL5RMQ_fFJb5mVEoRArw5H0Q9vCsjUkBGfvrLNr810yZrOIZLKrUW5Gq7vK2RR8GrPX1R1NIIrWe7FJgp1qr18-74q2vkNA8oGQitH1s0UJXXYObrJYZUZMGDh5NkGHyct1MwAqg.6GmP0pU4BfLq9vft.Lr_B5NID1Jsz1E-N9Hxz4PM7XV99sg.vNGa4jT1-N3eb7MZoj7REA", + id="alg: RSA1_5, enc: A128GCM" + ), + pytest.param( + b"eyJlbmMiOiJBMTkyR0NNIiwiYWxnIjoiUlNBMV81In0.pGI9inTliv1C52i9XOAVEXTcNR_KpOrK-flxdabnRFCCqVJDmvpoE1dO84FBTC0e0lSkfuGOdXOqOhgNho-rwtpKGeuAkk1X8NPmi-Cre6_hyZRcn-0M7tn4oqN-4JIh4FXSiMEJQfu2w7wTtZLX7FQvNRWYwl0klx_VB29rCEECTxvBDORmgT5N8WaEvqHb75X1SmO-t3JAlej2lJGKlrgThH7c5SUx0g702ccaMqORJ46JXKGGABqAUSwWpXozj5MimKg1UgVT6pXdj7MQtcMv_mhL7HIbUUZdTjbnkKmU-AH8rwJdIXsR5vosnzv_xOxf4BSOutkjqCBD7-psFw.AMBAA8ZpTm0c96TS.ehGiMXxn8bcH0yPmi9_d47UKc1C9hA.FyF6Wl57itn_W5hphdkXDA", + id="alg: RSA1_5, enc: A192GCM" + ), + pytest.param( + b"eyJhbGciOiJSU0ExXzUiLCJlbmMiOiJBMjU2R0NNIn0.FmKpIISKPpeA45DVJFzuHuZzuDBc9OblwI1pa80rwlKVB7GhhTpd4aXYWRLU4qMNUfGj_Imlxc0rYdOfPa1IvCrrED9KjR5H604ruZgJZigoYCkS3WnAUnMCIOaDSP_Ye2UC4OTwnDSXRIdgnoyM-g9l3fOjgSeoc2aCSRE5DGHrgEpvzaFWDl4YDD_im7IsFEM8H7H2TAlN7ftkbKN6jd9MMRDXd6y7HYvNm4Hi_gPDM70TWhj-LIb6NmJE19EAboy8Ul8HAFdaCAFxwlLa6tFQyOuw-PLnZQ_soLGZXUeFNuYOafIjmPL2tgJiHfj1K_IPZwmWZS2d4I45He3CRA.xAUHSwvfz51m45eo.XeSm9hkA2mUNPk9eiaZx-I7mY4ZJqg.T0S3B4H4KusBzyZos81EIQ", + id="alg: RSA1_5, enc: A256GCM" + ), + pytest.param( + b"eyJhbGciOiJSU0EtT0FFUCIsImVuYyI6IkExMjhDQkMtSFMyNTYifQ.wQh8pyyAMCQRMAeMMIXStaoBCytZ4Upd7hFqpGxkoHq6aCDjjXywERJqgx68co_vz29JkTlK0Z2UsUOLjM4M6TeEiKgw0zT7ENXehP6VeE0bo2_cCx0k8A_af2eJXpsaqIvRsdkqYCsSW96H_eq3PoqOx96DNWTHxY5OTDjthr8B5WCYx3qA1oepT1HXSfCDB_01Qg-OREMu6l4Qc3i-ci6kQfhoAHb-sowpM8tUPvOx28z9-3a5_HxWMh0jFez86d9RHCecJx1UxHMJ6GSCzd2ra2xKi1gqaiC8MZupjvVJeGEpb4uriFmw5zJ9YGnefLj9NPMvj79XTrjD4AalaA.o9RgfKTIB5wbkrRr-wkO0Q.7ejS9gM307dU3to_V3AtqukA14IhuFyLrRG9RmRH2cw.hXUMRYby8afLVMI3H-WHYw", + id="alg: RSA-OAEP, enc: A128CBC-HS256" + ), + pytest.param( + b"eyJhbGciOiJSU0EtT0FFUCIsImVuYyI6IkExOTJDQkMtSFMzODQifQ.u3QeBm1xbLlQSoDZJ5QFLT5KnTBvxHuh5WCb4Yt-jRVipJ_7DWBORoAsFXV-SB3oIeRlchcPX0QK2bz_uxFxNZGF9aLgROZXmyFGUs-S_6mewqnxiCgWcgM1fOvast6d65_Zrp8kgz8oev4EiuXwb2X1OO31BEOn3aZR7QGdD6O59q6pF79OU328hpKatqBjW4IdIgg68rtA2-87Xj9VqpqUBkgzJCf-z038yQR41GNVTRzMk6N2M3MgRYUFkqUHy59TRwplWQuRZ9vmkdotRGYI0ZQ7V5PzXhqYSJnx5Y9jYlIqv7sdz_b6lyqxkrtJGBRNfAFiil4HABIobx5YDw.2oKvl74hWoa3zpABph4L9Q.04KyNsCkVQAX-s547eYJOfj6SBR3cZypu2qy7ua4DUg.AKJwqOIH7wK3_7n_DmvZ96yq1vm3d6Mh", + id="alg: RSA-OAEP, enc: A192CBC-HS384" + ), + pytest.param( + b"eyJhbGciOiJSU0EtT0FFUCIsImVuYyI6IkEyNTZDQkMtSFM1MTIifQ.Kbd5rSN1afyre2DbkXOmGKkCNZ09TfAwNpDn1Ic7_HJNS42VDx584ReiEzpyIoWek8l87h1oZL0OC0f1ceEuuTR-_rZzKNqq6t44EvXvRusSHg_mTm8qYwyJIkJsD_Zgh0HUza20X6Ypu4ZheTzw70krFYhFnBKNXzhdrf4Bbz8e7IEeR7Po2VqOzx6JPNFsJ1tRSb9r4w60-1qq0MSdl2VItvHVY4fg-bts2k2sJ_Ub8VtRLY1MzPc1rFcI10x_AD52ntW-8T_BvY8R7Ci0cLfEycGlOM-pJOtJVY4bQisx-PvLgPoKlfTMX251m_np9ImSov9edy57-jy427l28g.w5rYu_XKzUCwTScFQ3fGOA.6zntLreCPN2Eo6aLmuqYrkyF2hOBXzNlArOOJ0iZ9TA.xiF5HLIBmIE8FCog-CZwXpIUjP6XgpncwXjw--dM57I", + id="alg: RSA-OAEP, enc: A256CBC-HS512" + ), + pytest.param( + b"eyJhbGciOiJSU0EtT0FFUCIsImVuYyI6IkExMjhHQ00ifQ.SUDoqix7_PhGaNeCxYEgmvZt-Bhj-EoPfnTbJpxgvdUSVk6cn2XjAJxiVHTaeM8_DPmxxeKqt-JEVljc7lUmHQpAW1Cule7ySw498OgG6q4ddpBZEPXqAHpqlfATrhGpEq0WPRZJwvbyKUd08rND1r4SePZg8sag6cvbiPbMHIzQSjGPkDwWt1P5ue7n1ySmxqGenjPlzl4g_n5wwPGG5e3RGmoiVQh2Stybp9j2fiLNzHKcO5_9BJxMR4DEB0DE3NGhszXFQneP009j4wxm5kKzuja0ks9tEdNAJ3NLWnQhU-w0_xeePj8SGxJXuGIQT0ox9yQlD-HnmlEqMWYplg.5XuF3e3g7ck1RRy8.VSph3xlmrPI3z6jcLdh862GaDq6_-g.3WcUUUcy1NZ-aFYU8u9KHA", + id="alg: RSA-OAEP, enc: A128GCM" + ), + pytest.param( + b"eyJlbmMiOiJBMTkyR0NNIiwiYWxnIjoiUlNBLU9BRVAifQ.Kw5PHADCCpBw63G-QwHuMK75gXlZzC_RJY1SH-7ABWvmnb1KWaDCtYWbNMl-4E-dlez-LKxCbATyCFo_1WKyJcRekue7YwmfSw-eYVNOYKi2al_7-xxY8vcfxnVnyIlCetGHOJPVgeDDXr1vjbdLgg2cJhO1lRi6mDypSHqKJtyhbAR3_AYdjELPMPIMQcMdsMHa9YF5vSqoj6DnB_Bc6oLFS2fSJPki5-Gq-raWUlfnGOXEMVTm3wZGyw13extRu-H8_b6YmarvQU2oSewhWwrF3fQMzCaTUNU_yxqA6x_oZrhEeTb_BL9Q6R1oYGEXBTVQhgzWMaVRD-HtkibFjQ.Vj-fCJQPordV5AMu.RQF0cTahIAY2a-1Nr68-XyghJn9piA.8KOygvGfOdn5Wr-u-EP9bQ", + id="alg: RSA-OAEP, enc: A192GCM" + ), + pytest.param( + b"eyJhbGciOiJSU0EtT0FFUCIsImVuYyI6IkEyNTZHQ00ifQ.kINITl6EJC8SY4Y8jejN1lnuwUeENgXUmYMS_wb2rcMga63pDieYdbm-ENlsFnFIC8ANukR_lx5TIhULJAVtPHFqN2Yyb8sOuG6JKX76E6DuBj1RdS6ejpVMBNNsiNYXYxvjsVnHMyBCE48zur9sZGFaHa3Sw-_Nnesm0ygo96AuTTnz6L-mzdpPK-EhWsA1fGaR0g0EpGyEjMh6NGp6n4BRqIbeSSOOwVW39akcnSs5Wl3gZq0tN0kArq_0dN4i-Yuqm30F65MQrTn7-nnjQCoXGkzlPlU9Ex-jWtkbqqjrHqJy-Gp_AVY24PRL7a_N5AHr1WHrcrkLdZEHmjGRMA.g0_LDNNkHJ7hUjGe.WwVpEFWAZ0GXhk2YhysMS9UMBs-yfQ.fTSHPmG68YG7VHIy0-r8vQ", + id="alg: RSA-OAEP, enc: A256GCM" + ), + pytest.param( + b"eyJhbGciOiJSU0EtT0FFUC0yNTYiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2In0.K6cguIsijzwwak3cqBzKlTb3izuWdFDrvClKDscxuPCfSy_dEH-WMalroPtf8sLdEa1ocrZF7udDQk6_uhD3BGy4pytFvkIy8H9jw2o7bYGU7M2qvm7CKrAE2rxk-CU4CRZItF9PWIdKxKSdvMd2lojVgLuiQKPu0EvZFW4OeV4X77Fy-0b9PcGkbkJ9iehKHk9yjqGJAGMiyTOse7_-cyXgLMJgiSKQWPfAgHYGPN39PbH_cPjxGsl4WwawmUxnEmcQ2ctVrtfvbieupGpL9LkHXIf3I08LXh8hbYGKksWeZOBDhmtKWoAnP7PrjRNeAHIag4NqTlnA8ZXx7dtS2g.uU6nyQdGTAvfbNijkodnfQ.02Bukf1CnQWB_jYUDFSooXGzqDXW0QyKvIzE-slzQtw.Tu7u7yN8HPlS7oHmmc-OQQ", + id="alg: RSA-OAEP-256, enc: A128CBC-HS256" + ), + pytest.param( + b"eyJhbGciOiJSU0EtT0FFUC0yNTYiLCJlbmMiOiJBMTkyQ0JDLUhTMzg0In0.AKPATE5ww9Lcpbmo7OSA3ulVO4Y17mni5sYyLoc4Lvj6Wn9bHuzhFLyPA16qDDJsNE5pXxC5wemuAQugXQReeU_nSPsFYE_D7tUR4jMCrFZHMUshq0Cml7bgc34vXtBuxSMAHu16JjFI52mZKTHjFcBqCxDHE8EKWf7EdaPZf06swWKeZAnOAaRh2i9wVMzmpCJ9cFCYv0T31FTkr2XG1ydgZP2TAnMevRuTvtZ6e5xsc6lq0IH4nQCqKp6Hnb8aaoiKKbQMHNWAcmJzWYBpM2Sesv6zvzkacASMjwvx301dQKFVWV5x8Ocx2klcPFNdIgevWyT0-mLbbxgVAWFiaw.aoWEVUUMXkE7jbBBlG6UTg.fQmbAROAo1D6DHczAX3MH_eJfvRVHveJt6po1_jRud0.JSuCoAEXq4JUbZYYlGSqXd70QSr8V0U3", + id="alg: RSA-OAEP-256, enc: A192CBC-HS384" + ), + pytest.param( + b"eyJhbGciOiJSU0EtT0FFUC0yNTYiLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0.nYIo9bUQgrQlnANR1IQI7EKPU75R2AoJR_44xXr5fEjEf444ucNbQvarO6HN5R_LQMEb0If7b8VyViMku1LuuFhYAoIfToT6SCcUgWG4vhN8mdc2Y4YsGqyF4k1c_EbQ3Gka_O04VZyhqukwpKUr89ASzqyJCWoP3kdiVfdjIkFnA_ApKGhnn2AwCy9_y8gW5TIVddYcOrQNVJtmxUWTgw6AxJSJkQztNfny6rbWdygXdeBXq7T4uAZYDquniE_h8f46SEUBb9UuMCq4eKVJZYJfPrKBVBMY9vncm-HAhl_IHzegLSJMgBWq_-idGMooxAypDg_Zi51zCpxinyrKeg.BiZjLouM-sJOpTprqKNVWw.0zL9BEdBAglQ-DQ2pBjJrRFsUt7qugRp3_nOY-sr75c.mcUVI1GvddAtqDMzElYzshrtS1GgnrUCb5brd2qzBlM", + id="alg: RSA-OAEP-256, enc: A256CBC-HS512" + ), + pytest.param( + b"eyJhbGciOiJSU0EtT0FFUC0yNTYiLCJlbmMiOiJBMTI4R0NNIn0.I8HnspRs9CiFyDyumZm5YthOVLl8Vn1unThm_EQd5YGcn0WPqXtrKeAWoP4rfOn7XaRNYeuLowpHEl-CzCjoEPEW-vui-t-P1JbDH6_wGwbdVIppdcwS6Npyv5qCNI21gPBDUB2twytEGqaYGKbbexxS8iE9iU4C_Wp-42axvUKEpxxNlQn-gPmHt4ZuzMGbI9Rl5wzT583SgmHwqXTklVC02aWQY2xQYelq5IVK-UBQ8J_NOBy7SeNeuAtmh7YxLGucSVlTqmzHImkOxsDU2UEiGJK-u8eGrgawx7DFSTUx8KXeMpsF2qe87PZhkSthpaqLFj1ZFQmVycnsN28IFg.C2qD0Dpiu2xWiDKj.o5WfgRbXOMzosaKtFCKpRyZ3nHJqLA.l8iOYFrtzGgd_x8ToB5d7w", + id="alg: RSA-OAEP-256, enc: A128GCM" + ), + pytest.param( + b"eyJlbmMiOiJBMTkyR0NNIiwiYWxnIjoiUlNBLU9BRVAtMjU2In0.f6ynrZWg8-lerNxAa5_i7S1KUmxMD7-d_DvpBEuSgH6GmOu8jkAiZDNdiX8EcsXGrRiZKNa0So92uRLRZNQ-gb9DAs8HCiXxERYkxN4EMjWlCq8T5gLQunIC-DIotde8deZFNnechKXmrO48VTPbfb8DyAwtXPtWJUBptldghCLXP63kwLpcQKKMNcAw_E1rAT6mJAiTnk3bOfKOZqdCIpwFfCPoPE-Ign_nmh2TlDX8VFkC2ZaT-CEwiQYhjmDrm6a9S3OEIfeKF-rkiGxPnrQCN3lZN2kCM5V2Wa98zmEYd1Ce-RuxB9GKAd4RUpoF84UtBUN9sGdNSasaTLzhHg.yQfUDlEQ88R6NCTm.sverha5tzKHC1T02_9WnJnt1pCmxDg.dxi-5Nz1-9u8becvm-z0EA", + id="alg: RSA-OAEP-256, enc: A192GCM" + ), + pytest.param( + b"eyJlbmMiOiJBMjU2R0NNIiwiYWxnIjoiUlNBLU9BRVAtMjU2In0.G0e3bfJXhYxHn-rnj0FgMd4obOFIg2DTNDKrgDLN_q7KCDYd4jmABFRNIDg_hjdHEMn8xxPhzXVFyNEGvuMPQtc9Eg6WUFdbMwwuEP3VEmTXz6qbE0E-yVC6SfUQxwbyf7jnx_bDuyKd67LaOLd7K6CyiBm7NYlFHNvGAXVEEsizCBSuGbGhoHLVOuQ0IFaW8qLyaMqLfoiwZpTajwTC_t3kyAK-WyD7lhPbUoNQd8Xuj5xEoAXxCqi_LVPgVRGaM9vV-EXERJfTrLt9D6NNbh6DpqDy4jvJpwqXGu58SQUe53gRxviPNvAhm6dWz8xiQ0VlI6fgu8QUc8hRi-f1aQ.A7LLQLgEoU32zDF-.5KvzCLZD6buklVSzHiJf0IlL6zU_Zg.3hs8tmElT4SpfCRhcAtHNA", + id="alg: RSA-OAEP-256, enc: A256GCM" + ), + ) + + JWE_128_BIT_OCT_PACKAGES = ( + pytest.param( + b"eyJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiQTEyOEtXIn0.n24LSLkqXdWX4YIaOj9dwlF-1t7hTytKdO5hqg3dQ24S6kIATishhA.JpEb2cELXXsKg8A2mIiZcQ.lbEuxBQPOy0osKUSjq_evT4GWB8U9EajBoe4HVLYb-U.9MTdcq_2zePAwKWdt2ORxQ", + id="alg: A128KW, enc: A128CBC-HS256" + ), + pytest.param( + b"eyJlbmMiOiJBMTkyQ0JDLUhTMzg0IiwiYWxnIjoiQTEyOEtXIn0.RxCzQYdCBk5KR89bLFaxXnMI02b2XjHll_fIALg92FDdvmBj84kMKRs3CYszqcLsEC5pZGji_cs.qItxFbHqLvUOU9-_kOldpQ.GEY1cC2jX2AZH5fBSr9JAuTNjL75oXLg_y_f5k5qrpI.dbx5ZSyhCdsR99uz3jlzdRBqq_bWr21V", + id="alg: A128KW, enc: A192CBC-HS384" + ), + pytest.param( + b"eyJlbmMiOiJBMjU2Q0JDLUhTNTEyIiwiYWxnIjoiQTEyOEtXIn0.ByJ5W2G8_vjD4Rl5kr6mYqiADET39cXvhhKQqTcu0OFFlBg8b5Auz1-n8LmPB-NF_4CTxd95RSn6Ykm5-CwYuRZ6plIh_VV_.YN9zjSsy0Hyq3yFR3RlKCw.9m3n0fZDmxxamWKoAvoyjCJtKJfLlc9U86tk5YgPz6Q.Mw30riFfQ7DbCe1pylfdN7XBhOnU58IG2g5i9-Stj7I", + id="alg: A128KW, enc: A256CBC-HS512" + ), + pytest.param( + b"eyJlbmMiOiJBMTI4R0NNIiwiYWxnIjoiQTEyOEtXIn0.PYaHt_cEHDwdKnmYyjkCg7T1HKrAy97a.WuIrAs7jHSsXqf9A.7g_Qp6DlNVrPptVpmzFDJ_1VPljD5w.RJXqRwBMyik9V1p96r-zFQ", + id="alg: A128KW, enc: A128GCM" + ), + pytest.param( + b"eyJlbmMiOiJBMTkyR0NNIiwiYWxnIjoiQTEyOEtXIn0.4lzeEoXilgqxK5mQ4_hLBLEygUe8bVhVTKjZ9pKPezw.Xf0HU0KkMCXEjeau.vnK_Ec_lnrxENj0tE-eLyPX3UO3vrg.vjIWpB_TtA73v93E0I7JvQ", + id="alg: A128KW, enc: A192GCM" + ), + pytest.param( + b"eyJlbmMiOiJBMjU2R0NNIiwiYWxnIjoiQTEyOEtXIn0.3PhAqtq7SE0CHpuhVhEViywAOn-w55vBvN1iGWKCn3-nioam-h-GIQ.4maA1p_t7_peTBZM.r60jOf8J5Y7lkFc8xBxtNl9yoC6jZA.SWPFQWHjLMzj0pq2CHJUBA", + id="alg: A128KW, enc: A256GCM" + ), + ) + + JWE_192_BIT_OCT_PACKAGES = ( + pytest.param( + b"eyJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiQTE5MktXIn0.RAXHkAPR_VsfFC5JAB0j24t_GdWa9udWTZZ_L18KE-qi9Au95oK-VA.J4YETSJp_EuV4AP0tWGIpw.VktlsPA1yF51IDXVtkrkmgHqPahz5-MjwAjCP0j3_EA.5h57BdovPem9fmyx-UcURA", + id="alg: A192KW, enc: A128CBC-HS256" + ), + pytest.param( + b"eyJlbmMiOiJBMTkyQ0JDLUhTMzg0IiwiYWxnIjoiQTE5MktXIn0.Z2AmR2Y6viywyGDPPO92V5MJCwfULSRGmeSjV4VHqEnVyUE-AJhKety8Kw5dS_ydWVpZ0IGe4S0.Ny9jR93JsAigFdJXrcb1hQ.jwvtxfGZC3O6P8lBFUSb5OTRLFVje6Fo1H0X5F4uv1w.p14y3-XZHA3FiFSvXdbTsaFkylbwIKn3", + id="alg: A192KW, enc: A192CBC-HS384" + ), + pytest.param( + b"eyJlbmMiOiJBMjU2Q0JDLUhTNTEyIiwiYWxnIjoiQTE5MktXIn0.wWChMcIkEaDfoMNfgsG1pomBzef0PYx_dJIe4V4JWeCS8RhH6_IzUb-zsgvyDUtKGeUHcwwQ66mpKnQO27-5p7cv6Geho9mq.5PcNZjsulZ3fTLu_NlQF2g.lXibtdYC3GsIiEtzkHqnOKu5uPrp6Fs8cdrakjZzQ4E.EuxTWElqFsG3lF4iJSGlzQKb3NXppEWQhWcpMOepjJE", + id="alg: A192KW, enc: A256CBC-HS512" + ), + pytest.param( + b"eyJlbmMiOiJBMTI4R0NNIiwiYWxnIjoiQTE5MktXIn0.acwCCSF6htimS4JReQVQii4RDwq9HD5a.JMdpDaFlJMMjm_Cz.tEkn2o4ngBafL16ldPcdR0VWhphi2w.3GpPpXKYtbPKzE6kTLtKEA", + id="alg: A192KW, enc: A128GCM" + ), + pytest.param( + b"eyJlbmMiOiJBMTkyR0NNIiwiYWxnIjoiQTE5MktXIn0.wXPSpLsBCaM0pBe9bFgUz-W0FLyAmGeRBI_VWD19rmU.XoiqwULpsnNvhwNo.sB3yhTbfBfWo7nbz8ZzLMX-RKzvrQw.Dn4XlsQpEjrf3mrjQ7sT5Q", + id="alg: A192KW, enc: A192GCM" + ), + pytest.param( + b"eyJlbmMiOiJBMjU2R0NNIiwiYWxnIjoiQTE5MktXIn0.DlGLalCXypefklVSenCRDRocRhHd3OI5vuAsxqTdDuVAks4PGbSkdw.zKpteENM-uElbKXK.cGvn4ozoLauLx-d7oEMJaLu-LttauA.1XOWJ6jZaxHWG13MUSPRAQ", + id="alg: A192KW, enc: A256GCM" + ), + ) + + JWE_256_BIT_OCT_PACKAGES = ( + pytest.param( + b"eyJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwiYWxnIjoiQTI1NktXIn0.XoNzh3DeJkShGkoZUlIHN6OHiA7ku5WzI_e9HvddWf-W6ygXfjiS8g.qUSQp7nyMReRRgGfw_VYmg.rrsoeZ_IecEkOAwOLyXWAo8uATnevhQJnIG4Gs-xUX8.05BaSh2pSaowV2omCOUdrw", + id="alg: A256KW, enc: A128CBC-HS256" + ), + pytest.param( + b"eyJlbmMiOiJBMTkyQ0JDLUhTMzg0IiwiYWxnIjoiQTI1NktXIn0.b2-ui_1ksCzR28fUnBqtfwhKJZxklXboiN6AkhiDlOuj54lrn5CcHCjOOj_p5TwYWrFIEV3cQqw.zZqUrF5ygGZ27kPqWsx1bg.qAgz0LaznF_uyh4k37DesB0k5im-GwC9Au7l0dXVdhI.guaip_HKbIHbKZJCVXSKjcNv40w5aYZQ", + id="alg: A256KW, enc: A192CBC-HS384" + ), + pytest.param( + b"eyJlbmMiOiJBMjU2Q0JDLUhTNTEyIiwiYWxnIjoiQTI1NktXIn0.325MvraC58qFxXdD1gjRMwM_1NTW1-517eOckhcWuDUeAEUm6AHM9y1UsyC3StCDgFzDWbIZe3fayLh7OqVilr31gdofBWI9.hN1R-yoBJzALfcVFUvdKkQ.n-bQyooo7ufWn1CETJ8YFy9BFGWNgggrgoDlhmGI_Y8.6VyiR7w1osq6T8_rR-BAvyKAWAQSSA3oEc4jOPO7iJw", + id="alg: A256KW, enc: A256CBC-HS512" + ), + pytest.param( + b"eyJlbmMiOiJBMTI4R0NNIiwiYWxnIjoiQTI1NktXIn0.d4wqGBFQG-MrzDgbWWB23o9LUgCkaTYt.NkFLhQfcR2swvLT3.lrt2LS9nrUqB9BDahJLqR-DZxutraA.MbghLfohCD71xfX8lRpVAw", + id="alg: A256KW, enc: A128GCM" + ), + pytest.param( + b"eyJlbmMiOiJBMTkyR0NNIiwiYWxnIjoiQTI1NktXIn0.XhgSqN16cCtwttRTxRXJfYx6FL9c56Bjo6VQx8E6vGI.C--W8_faFWiCxTCM.ZyTZRiqdLEMOnwQytgAujl-t6nZ-ZQ.GqyRs7YnGsGlwUehCXmllA", + id="alg: A256KW, enc: A192GCM" + ), + pytest.param( + b"eyJlbmMiOiJBMjU2R0NNIiwiYWxnIjoiQTI1NktXIn0.i_-ehDezyG89YFcWqU-MxPB1HtVHauEAUGInnjlodx44IJBLS4ap4Q.nCEooStwaMWLfDxt.lqjEVnCRHCaufTIcxT2MzeBwUE2V-Q.sIr7c2QlWIYSVwnXUHgITA", + id="alg: A256KW, enc: A256GCM" + ), + ) + + @pytest.mark.parametrize("jwe_package", JWE_RSA_PACKAGES) + def test_decrypt_rsa_key_wrap(self, jwe_package): + headers = jwe.get_unverified_header(jwe_package) + if headers["alg"] not in ALGORITHMS.SUPPORTED: + pytest.skip("alg {} not supported".format(headers["alg"])) + if headers["enc"] not in ALGORITHMS.SUPPORTED: + pytest.skip("enc {} not supported".format(headers["enc"])) + key = PRIVATE_KEY_PEM + actual = jwe.decrypt(jwe_package, key) + assert actual == b"Live long and prosper." + + @pytest.mark.parametrize("jwe_package", JWE_128_BIT_OCT_PACKAGES) + def test_decrypt_oct_128_key_wrap(self, jwe_package): + key = OCT_128_BIT_KEY + headers = jwe.get_unverified_header(jwe_package) + if headers["alg"] not in ALGORITHMS.SUPPORTED: + pytest.skip("alg {} not supported".format(headers["alg"])) + if headers["enc"] not in ALGORITHMS.SUPPORTED: + pytest.skip("enc {} not supported".format(headers["enc"])) + actual = jwe.decrypt(jwe_package, key) + assert actual == b"Live long and prosper." + + @pytest.mark.parametrize("jwe_package", JWE_192_BIT_OCT_PACKAGES) + def test_decrypt_oct_192_key_wrap(self, jwe_package): + headers = jwe.get_unverified_header(jwe_package) + if headers["alg"] not in ALGORITHMS.SUPPORTED: + pytest.skip("alg {} not supported".format(headers["alg"])) + if headers["enc"] not in ALGORITHMS.SUPPORTED: + pytest.skip("enc {} not supported".format(headers["enc"])) + key = OCT_192_BIT_KEY + actual = jwe.decrypt(jwe_package, key) + assert actual == b"Live long and prosper." + + @pytest.mark.parametrize("jwe_package", JWE_256_BIT_OCT_PACKAGES) + def test_decrypt_oct_256_key_wrap(self, jwe_package): + headers = jwe.get_unverified_header(jwe_package) + if headers["alg"] not in ALGORITHMS.SUPPORTED: + pytest.skip("alg {} not supported".format(headers["alg"])) + if headers["enc"] not in ALGORITHMS.SUPPORTED: + pytest.skip("enc {} not supported".format(headers["enc"])) + key = OCT_256_BIT_KEY + actual = jwe.decrypt(jwe_package, key) + assert actual == b"Live long and prosper." + + def test_invalid_jwe_is_parse_error(self): + with pytest.raises(JWEParseError): + jwe.decrypt("invalid", "key") + + def test_non_json_header_is_parse_error(self): + jwe_str = "ciOiJSU0ExXzUiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2In0." \ + "UGhIOguC7IuEvf_NPVaXsGMoLOmwvc1GyqlIKOK1nN94nHPoltGRhWhw7" \ + "Zx0-kFm1NJn8LE9XShH59_i8J0PH5ZZyNfGy2xGdULU7sHNF6Gp2vPLgN" \ + "Z__deLKxGHZ7PcHALUzoOegEI-8E66jX2E4zyJKx-YxzZIItRzC5hlRir" \ + "b6Y5Cl_p-ko3YvkkysZIFNPccxRU7qve1WYPxqbb2Yw8kZqa2rMWI5ng8" \ + "OtvzlV7elprCbuPhcCdZ6XDP0_F8rkXds2vE4X-ncOIM8hAYHHi29NX0m" \ + "cKiRaD0-D-ljQTP-cFPgwCp6X-nZZd9OHBv-B3oWh2TbqmScqXMR4gp_A" \ + "." \ + "AxY8DCtDaGlsbGljb3RoZQ." \ + "KDlTtXchhZTGufMYmOYGS4HffxPSUrfmqCHXaI9wOGY." \ + "9hH0vgRfYgPnAHOd8stkvw" + with pytest.raises(JWEParseError): + jwe.decrypt(jwe_str, "key") + + +class TestEncrypt(object): + + @pytest.mark.skipif(AESKey is None, reason="No AES backend") + def test_rfc7516_appendix_b_direct(self, monkeypatch): + algorithm = ALGORITHMS.DIR + encryption = ALGORITHMS.A128CBC_HS256 + key = bytes(bytearray( + [4, 211, 31, 197, 84, 157, 252, 254, 11, 100, 157, 250, 63, 170, + 106, 206, 107, 124, 212, 45, 111, 107, 9, 219, 200, 177, 0, 240, + 143, 156, 44, 207] + )) + plain_text = b"Live long and prosper." + expected_iv = bytes(bytearray([3, 22, 60, 12, 43, 67, 104, 105, 108, 108, 105, 99, 111, + 116, 104, 101])) + + for backend in backends: + monkeypatch.setattr(backend, "get_random_bytes", lambda x: expected_iv if x == 16 else key) + + expected = b"eyJhbGciOiJkaXIiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2In0..AxY8DCtDaGlsbGljb3RoZQ.KDlTtXchhZTGufMYmOYGS4HffxPSUrfmqCHXaI9wOGY.BIiCkt8mWOVyJOqDMwNqaQ" + actual = jwe.encrypt(plain_text, key, encryption, algorithm) + + assert actual == expected + + @pytest.mark.skipif(AESKey is None, reason="No AES backend") + @pytest.mark.parametrize("alg", filter(lambda x: x in ALGORITHMS.SUPPORTED, ALGORITHMS.RSA_KW)) + @pytest.mark.parametrize("enc", filter(lambda x: x in ALGORITHMS.SUPPORTED, ALGORITHMS.AES_ENC)) + @pytest.mark.parametrize("zip", ZIPS.SUPPORTED) + def test_encrypt_decrypt_rsa_kw(self, alg, enc, zip): + expected = b"Live long and prosper." + jwe_value = jwe.encrypt(expected[:], PUBLIC_KEY_PEM, enc, alg, zip) + actual = jwe.decrypt(jwe_value, PRIVATE_KEY_PEM) + assert actual == expected + + @pytest.mark.skipif(AESKey is None, reason="No AES backend") + @pytest.mark.parametrize("alg", ALGORITHMS.AES_KW) + @pytest.mark.parametrize("enc", filter(lambda x: x in ALGORITHMS.SUPPORTED, ALGORITHMS.AES_ENC)) + @pytest.mark.parametrize("zip", ZIPS.SUPPORTED) + def test_encrypt_decrypt_aes_kw(self, alg, enc, zip): + if alg == ALGORITHMS.A128KW: + key = OCT_128_BIT_KEY + elif alg == ALGORITHMS.A192KW: + key = OCT_192_BIT_KEY + elif alg == ALGORITHMS.A256KW: + key = OCT_256_BIT_KEY + else: + pytest.fail("I don't know how to handle enc {}".format(alg)) + expected = b"Live long and prosper." + jwe_value = jwe.encrypt(expected[:], key, enc, alg, zip) + actual = jwe.decrypt(jwe_value, key) + assert actual == expected + + @pytest.mark.skipif(AESKey is None, reason="No AES backend") + @pytest.mark.parametrize("enc", filter(lambda x: x in ALGORITHMS.SUPPORTED, ALGORITHMS.AES_ENC)) + @pytest.mark.parametrize("zip", ZIPS.SUPPORTED) + def test_encrypt_decrypt_dir_kw(self, enc, zip): + if enc == ALGORITHMS.A128GCM: + key = OCT_128_BIT_KEY + elif enc == ALGORITHMS.A192GCM: + key = OCT_192_BIT_KEY + elif enc in (ALGORITHMS.A128CBC_HS256, ALGORITHMS.A256GCM): + key = OCT_256_BIT_KEY + elif enc == ALGORITHMS.A192CBC_HS384: + key = OCT_384_BIT_KEY + elif enc == ALGORITHMS.A256CBC_HS512: + key = OCT_512_BIT_KEY + else: + pytest.fail("I don't know how to handle enc {}".format(enc)) + expected = b"Live long and prosper." + jwe_value = jwe.encrypt(expected[:], key, enc, ALGORITHMS.DIR, zip) + actual = jwe.decrypt(jwe_value, key) + assert actual == expected + + @pytest.mark.skipif(AESKey is None, reason="No AES backend") + def test_alg_enc_headers(self): + enc = ALGORITHMS.A256CBC_HS512 + alg = ALGORITHMS.RSA_OAEP_256 + encrypted = jwe.encrypt("Text", PUBLIC_KEY_PEM, enc, alg) + header = json.loads(six.ensure_str(base64url_decode(encrypted.split(b".")[0]))) + assert header["enc"] == enc + assert header["alg"] == alg + + @pytest.mark.skipif(AESKey is None, reason="No AES backend") + def test_cty_header_present_when_provided(self): + enc = ALGORITHMS.A256CBC_HS512 + alg = ALGORITHMS.RSA_OAEP_256 + encrypted = jwe.encrypt("Text", PUBLIC_KEY_PEM, enc, alg, + cty="expected") + header = json.loads(six.ensure_str(base64url_decode(encrypted.split(b".")[0]))) + assert header["cty"] == "expected" + + @pytest.mark.skipif(AESKey is None, reason="No AES backend") + def test_cty_header_not_present_when_not_provided(self): + enc = ALGORITHMS.A256CBC_HS512 + alg = ALGORITHMS.RSA_OAEP_256 + encrypted = jwe.encrypt("Text", PUBLIC_KEY_PEM, enc, alg) + header = json.loads(six.ensure_str(base64url_decode(encrypted.split(b".")[0]))) + assert "cty" not in header + + @pytest.mark.skipif(AESKey is None, reason="No AES backend") + def test_zip_header_present_when_provided(self): + enc = ALGORITHMS.A256CBC_HS512 + alg = ALGORITHMS.RSA_OAEP_256 + encrypted = jwe.encrypt(b"Text", PUBLIC_KEY_PEM, enc, alg, + zip=ZIPS.DEF) + header = json.loads(six.ensure_str(base64url_decode(encrypted.split(b".")[0]))) + assert header["zip"] == ZIPS.DEF + + @pytest.mark.skipif(AESKey is None, reason="No AES backend") + def test_zip_header_not_present_when_not_provided(self): + enc = ALGORITHMS.A256CBC_HS512 + alg = ALGORITHMS.RSA_OAEP_256 + encrypted = jwe.encrypt(b"Text", PUBLIC_KEY_PEM, enc, alg) + header = json.loads(six.ensure_str(base64url_decode(encrypted.split(b".")[0]))) + assert "zip" not in header + + @pytest.mark.skipif(AESKey is None, reason="No AES backend") + def test_zip_header_not_present_when_none(self): + enc = ALGORITHMS.A256CBC_HS512 + alg = ALGORITHMS.RSA_OAEP_256 + encrypted = jwe.encrypt("Text", PUBLIC_KEY_PEM, enc, alg, + zip=ZIPS.NONE) + header = json.loads(six.ensure_str(base64url_decode(encrypted.split(b".")[0]))) + assert "zip" not in header + + @pytest.mark.skipif(AESKey is None, reason="No AES backend") + def test_kid_header_present_when_provided(self): + enc = ALGORITHMS.A256CBC_HS512 + alg = ALGORITHMS.RSA_OAEP_256 + encrypted = jwe.encrypt("Text", PUBLIC_KEY_PEM, enc, alg, + kid="expected") + header = json.loads(six.ensure_str(base64url_decode(encrypted.split(b".")[0]))) + assert header["kid"] == "expected" + + @pytest.mark.skipif(AESKey is None, reason="No AES backend") + def test_kid_header_not_present_when_not_provided(self): + enc = ALGORITHMS.A256CBC_HS512 + alg = ALGORITHMS.RSA_OAEP_256 + encrypted = jwe.encrypt("Text", PUBLIC_KEY_PEM, enc, alg) + header = json.loads(six.ensure_str(base64url_decode(encrypted.split(b".")[0]))) + assert "kid" not in header diff --git a/tests/test_jwk.py b/tests/test_jwk.py index 6ea5a1b8..aeeda5ac 100644 --- a/tests/test_jwk.py +++ b/tests/test_jwk.py @@ -1,7 +1,7 @@ from jose import jwk from jose.exceptions import JWKError from jose.backends.base import Key -from jose.backends import ECKey, RSAKey +from jose.backends import ECKey, RSAKey, HMACKey, AESKey import pytest @@ -45,7 +45,7 @@ def test_interface(self): def test_invalid_hash_alg(self): with pytest.raises(JWKError): - key = jwk.HMACKey(hmac_key, 'RS512') + key = HMACKey(hmac_key, 'RS512') with pytest.raises(JWKError): key = RSAKey(rsa_key, 'HS512') @@ -56,7 +56,7 @@ def test_invalid_hash_alg(self): def test_invalid_jwk(self): with pytest.raises(JWKError): - key = jwk.HMACKey(rsa_key, 'HS256') + key = HMACKey(rsa_key, 'HS256') with pytest.raises(JWKError): key = RSAKey(hmac_key, 'RS256') @@ -122,13 +122,17 @@ def test_construct_from_jwk_missing_alg(self): def test_get_key(self): hs_key = jwk.get_key("HS256") - assert hs_key == jwk.HMACKey + assert hs_key == HMACKey assert issubclass(hs_key, Key) assert issubclass(jwk.get_key("RS256"), Key) assert issubclass(jwk.get_key("ES256"), Key) assert jwk.get_key("NONEXISTENT") is None + @pytest.mark.skipif(AESKey is None, reason="No AES provider") + def test_get_aes_key(self): + assert issubclass(jwk.get_key("A256CBC-HS512"), Key) + def test_register_key(self): assert jwk.register_key("ALG", jwk.Key) assert jwk.get_key("ALG") == jwk.Key diff --git a/tests/test_jws.py b/tests/test_jws.py index b31e0b4f..2282c5f2 100644 --- a/tests/test_jws.py +++ b/tests/test_jws.py @@ -298,7 +298,6 @@ def test_wrong_key(self, payload): with pytest.raises(JWSError): jws.verify(token, rsa_public_key, ALGORITHMS.HS256) - @pytest.mark.skipif(RSAKey is CryptographyRSAKey, reason="Cryptography backend outright fails verification") def test_private_verify_raises_warning(self, payload): token = jws.sign(payload, rsa_private_key, algorithm='RS256') diff --git a/tox.ini b/tox.ini index 81f0ab76..efe7d15e 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,7 @@ [tox] minversion = 3.4.0 envlist = - py{27,35,36,py}-{base,cryptography-only,pycryptodome-norsa,pycrypto-norsa,compatibility}, + py{27,35,36,37,38,39,py,py3}-{base,cryptography-only,pycryptodome-norsa,pycrypto-norsa,compatibility}, flake8 skip_missing_interpreters = True @@ -10,6 +10,18 @@ commands = pip --version py.test --cov-report term-missing --cov jose {posargs} +[testenv:pypy-compatibility] +# This testenv locks up during coverage so just run tests +commands = + pip --version + py.test + +[testenv:pypy3-compatibility] +# This testenv locks up during coverage so just run tests +commands = + pip --version + py.test + [testenv:compatibility] extras = cryptography @@ -21,7 +33,6 @@ deps = pytest pytest-cov pytest-runner - -r{toxinidir}/requirements.txt commands_pre = # Remove the python-rsa and python-ecdsa backends @@ -44,8 +55,13 @@ extras = pycryptodome: pycryptodome pycrypto: pycrypto compatibility: {[testenv:compatibility]extras} +setenv = + # pycrypto does not reliably build if GMP is installed and used so disable it + pypy: with_gmp=no + pypy3: with_gmp=no [testenv:flake8] +basepython = python3.6 skip_install= True deps = flake8