Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix urllib3 warning #336

Merged
merged 5 commits into from Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions dev-requirements.txt
Expand Up @@ -2,3 +2,4 @@ pytest
mock;python_version<"3.3"
pyopenssl
git+git://github.com/sigmavirus24/betamax
trustme
13 changes: 0 additions & 13 deletions requests_toolbelt/_compat.py
Expand Up @@ -49,18 +49,6 @@
except ImportError:
from urllib3.contrib import appengine as gaecontrib

if requests.__build__ < 0x021200:
PyOpenSSLContext = None
else:
try:
from requests.packages.urllib3.contrib.pyopenssl \
import PyOpenSSLContext
except ImportError:
try:
from urllib3.contrib.pyopenssl import PyOpenSSLContext
except ImportError:
PyOpenSSLContext = None

PY3 = sys.version_info > (3, 0)

if PY3:
Expand Down Expand Up @@ -320,5 +308,4 @@ def from_httplib(cls, message): # Python 2
'urlencode',
'gaecontrib',
'urljoin',
'PyOpenSSLContext',
)
20 changes: 19 additions & 1 deletion requests_toolbelt/adapters/x509.py
Expand Up @@ -18,7 +18,6 @@
from requests.adapters import HTTPAdapter
import requests

from .._compat import PyOpenSSLContext
from .. import exceptions as exc

"""
Expand All @@ -32,6 +31,9 @@
from _ssl import PROTOCOL_SSLv23 as PROTOCOL


PyOpenSSLContext = None


class X509Adapter(HTTPAdapter):
r"""Adapter for use with X.509 certificates.

Expand Down Expand Up @@ -81,6 +83,7 @@ class X509Adapter(HTTPAdapter):
"""

def __init__(self, *args, **kwargs):
self._import_pyopensslcontext()
self._check_version()
cert_bytes = kwargs.pop('cert_bytes', None)
pk_bytes = kwargs.pop('pk_bytes', None)
Expand Down Expand Up @@ -118,6 +121,21 @@ def proxy_manager_for(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
return super(X509Adapter, self).proxy_manager_for(*args, **kwargs)

def _import_pyopensslcontext(self):
global PyOpenSSLContext

if requests.__build__ < 0x021200:
PyOpenSSLContext = None
else:
try:
from requests.packages.urllib3.contrib.pyopenssl \
import PyOpenSSLContext
except ImportError:
try:
from urllib3.contrib.pyopenssl import PyOpenSSLContext
except ImportError:
PyOpenSSLContext = None

def _check_version(self):
if PyOpenSSLContext is None:
raise exc.VersionMismatchError(
Expand Down
Binary file removed tests/certs/test_cert.p12
Binary file not shown.
14 changes: 0 additions & 14 deletions tests/test_compat.py

This file was deleted.

49 changes: 26 additions & 23 deletions tests/test_x509_adapter.py
Expand Up @@ -4,50 +4,48 @@
import pytest

try:
from OpenSSL.crypto import load_pkcs12
import OpenSSL
except ImportError:
PYOPENSSL_AVAILABLE = False
else:
PYOPENSSL_AVAILABLE = True
from requests_toolbelt.adapters.x509 import X509Adapter
from cryptography import x509
from cryptography.hazmat.primitives.serialization import (
Encoding,
PrivateFormat,
NoEncryption,
BestAvailableEncryption
BestAvailableEncryption,
load_pem_private_key,
)
import trustme

from requests_toolbelt import exceptions as exc
from . import get_betamax

REQUESTS_SUPPORTS_SSL_CONTEXT = requests.__build__ >= 0x021200

pytestmark = pytest.mark.filterwarnings(
"ignore:'urllib3.contrib.pyopenssl' module is deprecated:DeprecationWarning")


class TestX509Adapter(unittest.TestCase):
"""Tests a simple requests.get() call using a .p12 cert.
"""
def setUp(self):
with open('./tests/certs/test_cert.p12', 'rb') as pkcs12_file:
self.pkcs12_data = pkcs12_file.read()

self.pkcs12_password_bytes = "test".encode('utf8')
self.session = requests.Session()

@pytest.mark.xfail
@pytest.mark.skipif(not REQUESTS_SUPPORTS_SSL_CONTEXT,
reason="Requires Requests v2.12.0 or later")
reason="Requires Requests v2.12.0 or later")
@pytest.mark.skipif(not PYOPENSSL_AVAILABLE,
reason="Requires OpenSSL")
reason="Requires OpenSSL")
def test_x509_pem(self):
p12 = load_pkcs12(self.pkcs12_data, self.pkcs12_password_bytes)
cert_bytes = p12.get_certificate().to_cryptography().public_bytes(Encoding.PEM)
pk_bytes = p12.get_privatekey().\
to_cryptography_key().\
private_bytes(Encoding.PEM, PrivateFormat.PKCS8,
BestAvailableEncryption(self.pkcs12_password_bytes))
ca = trustme.CA()
cert = ca.issue_cert(u'pkiprojecttest01.dev.labs.internal')
cert_bytes = cert.cert_chain_pems[0].bytes()
pk_bytes = cert.private_key_pem.bytes()

adapter = X509Adapter(max_retries=3, cert_bytes=cert_bytes,
pk_bytes=pk_bytes, password=self.pkcs12_password_bytes)
adapter = X509Adapter(max_retries=3, cert_bytes=cert_bytes, pk_bytes=pk_bytes)
self.session.mount('https://', adapter)
recorder = get_betamax(self.session)
with recorder.use_cassette('test_x509_adapter_pem'):
Expand All @@ -56,16 +54,21 @@ def test_x509_pem(self):
assert r.status_code == 200
assert r.text

@pytest.mark.xfail
@pytest.mark.skipif(not REQUESTS_SUPPORTS_SSL_CONTEXT,
reason="Requires Requests v2.12.0 or later")
@pytest.mark.skipif(not PYOPENSSL_AVAILABLE,
reason="Requires OpenSSL")
def test_x509_der(self):
p12 = load_pkcs12(self.pkcs12_data, self.pkcs12_password_bytes)
cert_bytes = p12.get_certificate().to_cryptography().public_bytes(Encoding.DER)
pk_bytes = p12.get_privatekey().to_cryptography_key().private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())
adapter = X509Adapter(max_retries=3, cert_bytes=cert_bytes, pk_bytes=pk_bytes, encoding=Encoding.DER)
def test_x509_der_and_password(self):
ca = trustme.CA()
cert = ca.issue_cert(u'pkiprojecttest01.dev.labs.internal')
cert_bytes = x509.load_pem_x509_certificate(
cert.cert_chain_pems[0].bytes()).public_bytes(Encoding.DER)
pem_pk = load_pem_private_key(cert.private_key_pem.bytes(), password=None)
pk_bytes = pem_pk.private_bytes(Encoding.DER, PrivateFormat.PKCS8,
BestAvailableEncryption(self.pkcs12_password_bytes))

adapter = X509Adapter(max_retries=3, cert_bytes=cert_bytes, pk_bytes=pk_bytes,
password=self.pkcs12_password_bytes, encoding=Encoding.DER)
self.session.mount('https://', adapter)
recorder = get_betamax(self.session)
with recorder.use_cassette('test_x509_adapter_der'):
Expand Down
5 changes: 3 additions & 2 deletions tox.ini
Expand Up @@ -18,8 +18,9 @@ deps =
pyopenssl
ndg-httpsclient
betamax>0.5.0
trustme
commands =
py.test {posargs}
pytest -W error::DeprecationWarning {posargs}

[testenv:noopenssl]
basepython = python3.7
Expand All @@ -30,7 +31,7 @@ deps =
mock;python_version<"3.3"
betamax>0.5.0
commands =
py.test {posargs}
pytest -W error::DeprecationWarning {posargs}

[testenv:py27-flake8]
basepython = python2.7
Expand Down