forked from requests/toolbelt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_x509_adapter.py
80 lines (68 loc) · 3.19 KB
/
test_x509_adapter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# -*- coding: utf-8 -*-
import requests
import unittest
import pytest
try:
from OpenSSL.crypto import load_pkcs12
except ImportError:
PYOPENSSL_AVAILABLE = False
else:
PYOPENSSL_AVAILABLE = True
from requests_toolbelt.adapters.x509 import X509Adapter
from cryptography.hazmat.primitives.serialization import (
Encoding,
PrivateFormat,
NoEncryption,
BestAvailableEncryption
)
from requests_toolbelt import exceptions as exc
from . import get_betamax
REQUESTS_SUPPORTS_SSL_CONTEXT = requests.__build__ >= 0x021200
class TestX509Adapter(unittest.TestCase):
"""Tests a simple requests.get() call using a .p12 cert.
"""
def setUp(self):
with open('./tests/certs/test_cert.p12', 'rb') as pkcs12_file:
self.pkcs12_data = pkcs12_file.read()
self.pkcs12_password_bytes = "test".encode('utf8')
self.session = requests.Session()
@pytest.mark.xfail
@pytest.mark.skipif(not REQUESTS_SUPPORTS_SSL_CONTEXT,
reason="Requires Requests v2.12.0 or later")
@pytest.mark.skipif(not PYOPENSSL_AVAILABLE,
reason="Requires OpenSSL")
def test_x509_pem(self):
p12 = load_pkcs12(self.pkcs12_data, self.pkcs12_password_bytes)
cert_bytes = p12.get_certificate().to_cryptography().public_bytes(Encoding.PEM)
pk_bytes = p12.get_privatekey().\
to_cryptography_key().\
private_bytes(Encoding.PEM, PrivateFormat.PKCS8,
BestAvailableEncryption(self.pkcs12_password_bytes))
adapter = X509Adapter(max_retries=3, cert_bytes=cert_bytes,
pk_bytes=pk_bytes, password=self.pkcs12_password_bytes)
self.session.mount('https://', adapter)
recorder = get_betamax(self.session)
with recorder.use_cassette('test_x509_adapter_pem'):
r = self.session.get('https://pkiprojecttest01.dev.labs.internal/', verify=False)
assert r.status_code == 200
assert r.text
@pytest.mark.xfail
@pytest.mark.skipif(not REQUESTS_SUPPORTS_SSL_CONTEXT,
reason="Requires Requests v2.12.0 or later")
@pytest.mark.skipif(not PYOPENSSL_AVAILABLE,
reason="Requires OpenSSL")
def test_x509_der(self):
p12 = load_pkcs12(self.pkcs12_data, self.pkcs12_password_bytes)
cert_bytes = p12.get_certificate().to_cryptography().public_bytes(Encoding.DER)
pk_bytes = p12.get_privatekey().to_cryptography_key().private_bytes(Encoding.DER, PrivateFormat.PKCS8, NoEncryption())
adapter = X509Adapter(max_retries=3, cert_bytes=cert_bytes, pk_bytes=pk_bytes, encoding=Encoding.DER)
self.session.mount('https://', adapter)
recorder = get_betamax(self.session)
with recorder.use_cassette('test_x509_adapter_der'):
r = self.session.get('https://pkiprojecttest01.dev.labs.internal/', verify=False)
assert r.status_code == 200
assert r.text
@pytest.mark.skipif(REQUESTS_SUPPORTS_SSL_CONTEXT, reason="Will not raise exc")
def test_requires_new_enough_requests(self):
with pytest.raises(exc.VersionMismatchError):
X509Adapter()