diff --git a/dev-requirements.txt b/dev-requirements.txt index 0c7a35a5..202a17ac 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -2,3 +2,4 @@ pytest mock;python_version<"3.3" pyopenssl git+git://github.com/sigmavirus24/betamax +trustme diff --git a/requests_toolbelt/_compat.py b/requests_toolbelt/_compat.py index 7927a382..e813461e 100644 --- a/requests_toolbelt/_compat.py +++ b/requests_toolbelt/_compat.py @@ -49,18 +49,6 @@ except ImportError: from urllib3.contrib import appengine as gaecontrib -if requests.__build__ < 0x021200: - PyOpenSSLContext = None -else: - try: - from requests.packages.urllib3.contrib.pyopenssl \ - import PyOpenSSLContext - except ImportError: - try: - from urllib3.contrib.pyopenssl import PyOpenSSLContext - except ImportError: - PyOpenSSLContext = None - PY3 = sys.version_info > (3, 0) if PY3: @@ -320,5 +308,4 @@ def from_httplib(cls, message): # Python 2 'urlencode', 'gaecontrib', 'urljoin', - 'PyOpenSSLContext', ) diff --git a/requests_toolbelt/adapters/x509.py b/requests_toolbelt/adapters/x509.py index 21bfacb9..aff37706 100644 --- a/requests_toolbelt/adapters/x509.py +++ b/requests_toolbelt/adapters/x509.py @@ -18,7 +18,6 @@ from requests.adapters import HTTPAdapter import requests -from .._compat import PyOpenSSLContext from .. import exceptions as exc """ @@ -32,6 +31,9 @@ from _ssl import PROTOCOL_SSLv23 as PROTOCOL +PyOpenSSLContext = None + + class X509Adapter(HTTPAdapter): r"""Adapter for use with X.509 certificates. @@ -81,6 +83,7 @@ class X509Adapter(HTTPAdapter): """ def __init__(self, *args, **kwargs): + self._import_pyopensslcontext() self._check_version() cert_bytes = kwargs.pop('cert_bytes', None) pk_bytes = kwargs.pop('pk_bytes', None) @@ -118,6 +121,21 @@ def proxy_manager_for(self, *args, **kwargs): kwargs['ssl_context'] = self.ssl_context return super(X509Adapter, self).proxy_manager_for(*args, **kwargs) + def _import_pyopensslcontext(self): + global PyOpenSSLContext + + if requests.__build__ < 0x021200: + PyOpenSSLContext = None + else: + try: + from requests.packages.urllib3.contrib.pyopenssl \ + import PyOpenSSLContext + except ImportError: + try: + from urllib3.contrib.pyopenssl import PyOpenSSLContext + except ImportError: + PyOpenSSLContext = None + def _check_version(self): if PyOpenSSLContext is None: raise exc.VersionMismatchError( diff --git a/tests/certs/test_cert.p12 b/tests/certs/test_cert.p12 deleted file mode 100644 index fddf5d65..00000000 Binary files a/tests/certs/test_cert.p12 and /dev/null differ diff --git a/tests/test_compat.py b/tests/test_compat.py deleted file mode 100644 index dfe3016b..00000000 --- a/tests/test_compat.py +++ /dev/null @@ -1,14 +0,0 @@ -import pytest - -try: - import OpenSSL -except ImportError: - PYOPENSSL_AVAILABLE = False -else: - PYOPENSSL_AVAILABLE = True - -@pytest.mark.skipif(PYOPENSSL_AVAILABLE, - reason="Requires OpenSSL to be missing to test fallback") -def test_pyopensslcontext_is_none_when_package_missing(): - import requests_toolbelt._compat - assert requests_toolbelt._compat.PyOpenSSLContext is None diff --git a/tests/test_x509_adapter.py b/tests/test_x509_adapter.py index bcc00b0f..84a10e2c 100644 --- a/tests/test_x509_adapter.py +++ b/tests/test_x509_adapter.py @@ -4,50 +4,48 @@ import pytest try: - from OpenSSL.crypto import load_pkcs12 + import OpenSSL except ImportError: PYOPENSSL_AVAILABLE = False else: PYOPENSSL_AVAILABLE = True from requests_toolbelt.adapters.x509 import X509Adapter + from cryptography import x509 from cryptography.hazmat.primitives.serialization import ( Encoding, PrivateFormat, - NoEncryption, - BestAvailableEncryption + BestAvailableEncryption, + load_pem_private_key, ) + import trustme from requests_toolbelt import exceptions as exc from . import get_betamax REQUESTS_SUPPORTS_SSL_CONTEXT = requests.__build__ >= 0x021200 +pytestmark = pytest.mark.filterwarnings( + "ignore:'urllib3.contrib.pyopenssl' module is deprecated:DeprecationWarning") + class TestX509Adapter(unittest.TestCase): """Tests a simple requests.get() call using a .p12 cert. """ def setUp(self): - with open('./tests/certs/test_cert.p12', 'rb') as pkcs12_file: - self.pkcs12_data = pkcs12_file.read() - self.pkcs12_password_bytes = "test".encode('utf8') self.session = requests.Session() - @pytest.mark.xfail @pytest.mark.skipif(not REQUESTS_SUPPORTS_SSL_CONTEXT, - reason="Requires Requests v2.12.0 or later") + reason="Requires Requests v2.12.0 or later") @pytest.mark.skipif(not PYOPENSSL_AVAILABLE, - reason="Requires OpenSSL") + reason="Requires OpenSSL") def test_x509_pem(self): - p12 = load_pkcs12(self.pkcs12_data, self.pkcs12_password_bytes) - cert_bytes = p12.get_certificate().to_cryptography().public_bytes(Encoding.PEM) - pk_bytes = p12.get_privatekey().\ - to_cryptography_key().\ - private_bytes(Encoding.PEM, PrivateFormat.PKCS8, - BestAvailableEncryption(self.pkcs12_password_bytes)) + ca = trustme.CA() + cert = ca.issue_cert(u'pkiprojecttest01.dev.labs.internal') + cert_bytes = cert.cert_chain_pems[0].bytes() + pk_bytes = cert.private_key_pem.bytes() - adapter = X509Adapter(max_retries=3, cert_bytes=cert_bytes, - pk_bytes=pk_bytes, password=self.pkcs12_password_bytes) + adapter = X509Adapter(max_retries=3, cert_bytes=cert_bytes, pk_bytes=pk_bytes) self.session.mount('https://', adapter) recorder = get_betamax(self.session) with recorder.use_cassette('test_x509_adapter_pem'): @@ -56,16 +54,21 @@ def test_x509_pem(self): assert r.status_code == 200 assert r.text - @pytest.mark.xfail @pytest.mark.skipif(not REQUESTS_SUPPORTS_SSL_CONTEXT, reason="Requires Requests v2.12.0 or later") @pytest.mark.skipif(not PYOPENSSL_AVAILABLE, reason="Requires OpenSSL") - def test_x509_der(self): - p12 = load_pkcs12(self.pkcs12_data, self.pkcs12_password_bytes) - cert_bytes = p12.get_certificate().to_cryptography().public_bytes(Encoding.DER) - pk_bytes = p12.get_privatekey().to_cryptography_key().private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption()) - adapter = X509Adapter(max_retries=3, cert_bytes=cert_bytes, pk_bytes=pk_bytes, encoding=Encoding.DER) + def test_x509_der_and_password(self): + ca = trustme.CA() + cert = ca.issue_cert(u'pkiprojecttest01.dev.labs.internal') + cert_bytes = x509.load_pem_x509_certificate( + cert.cert_chain_pems[0].bytes()).public_bytes(Encoding.DER) + pem_pk = load_pem_private_key(cert.private_key_pem.bytes(), password=None) + pk_bytes = pem_pk.private_bytes(Encoding.DER, PrivateFormat.PKCS8, + BestAvailableEncryption(self.pkcs12_password_bytes)) + + adapter = X509Adapter(max_retries=3, cert_bytes=cert_bytes, pk_bytes=pk_bytes, + password=self.pkcs12_password_bytes, encoding=Encoding.DER) self.session.mount('https://', adapter) recorder = get_betamax(self.session) with recorder.use_cassette('test_x509_adapter_der'): diff --git a/tox.ini b/tox.ini index be886a85..253c8bac 100644 --- a/tox.ini +++ b/tox.ini @@ -18,8 +18,9 @@ deps = pyopenssl ndg-httpsclient betamax>0.5.0 + trustme commands = - py.test {posargs} + pytest -W error::DeprecationWarning {posargs} [testenv:noopenssl] basepython = python3.7 @@ -30,7 +31,7 @@ deps = mock;python_version<"3.3" betamax>0.5.0 commands = - py.test {posargs} + pytest -W error::DeprecationWarning {posargs} [testenv:py27-flake8] basepython = python2.7