-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
conftest.py
127 lines (96 loc) · 3.93 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import collections
import contextlib
import threading
import platform
import sys
import pytest
import trustme
from tornado import web, ioloop
from dummyserver.handlers import TestingApp
from dummyserver.server import run_tornado_app
from dummyserver.server import (
DEFAULT_CA,
DEFAULT_CA_KEY,
CLIENT_INTERMEDIATE_PEM,
CLIENT_NO_INTERMEDIATE_PEM,
CLIENT_INTERMEDIATE_KEY,
HAS_IPV6,
)
# The Python 3.8+ default loop on Windows breaks Tornado
@pytest.fixture(scope="session", autouse=True)
def configure_windows_event_loop():
if sys.version_info >= (3, 8) and platform.system() == "Windows":
import asyncio
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
@pytest.fixture(scope="session")
def certs_dir(tmp_path_factory):
tmpdir = tmp_path_factory.mktemp("certs")
# Start from existing root CA as we don't want to change the server certificate yet
with open(DEFAULT_CA, "rb") as crt, open(DEFAULT_CA_KEY, "rb") as key:
root_ca = trustme.CA.from_pem(crt.read(), key.read())
# client cert chain
intermediate_ca = root_ca.create_child_ca()
cert = intermediate_ca.issue_cert(u"example.com")
cert.private_key_pem.write_to_path(str(tmpdir / CLIENT_INTERMEDIATE_KEY))
# Write the client cert and the intermediate CA
client_cert = str(tmpdir / CLIENT_INTERMEDIATE_PEM)
cert.cert_chain_pems[0].write_to_path(client_cert)
cert.cert_chain_pems[1].write_to_path(client_cert, append=True)
# Write only the client cert
cert.cert_chain_pems[0].write_to_path(str(tmpdir / CLIENT_NO_INTERMEDIATE_PEM))
yield tmpdir
ServerConfig = collections.namedtuple("ServerConfig", ["host", "port", "ca_certs"])
@contextlib.contextmanager
def run_server_in_thread(scheme, host, tmpdir, ca, server_cert):
ca_cert_path = str(tmpdir / "ca.pem")
server_cert_path = str(tmpdir / "server.pem")
server_key_path = str(tmpdir / "server.key")
ca.cert_pem.write_to_path(ca_cert_path)
server_cert.private_key_pem.write_to_path(server_key_path)
server_cert.cert_chain_pems[0].write_to_path(server_cert_path)
server_certs = {"keyfile": server_key_path, "certfile": server_cert_path}
io_loop = ioloop.IOLoop.current()
app = web.Application([(r".*", TestingApp)])
server, port = run_tornado_app(app, io_loop, server_certs, scheme, host)
server_thread = threading.Thread(target=io_loop.start)
server_thread.start()
yield ServerConfig(host, port, ca_cert_path)
io_loop.add_callback(server.stop)
io_loop.add_callback(io_loop.stop)
server_thread.join()
@pytest.fixture
def no_san_server(tmp_path_factory):
tmpdir = tmp_path_factory.mktemp("certs")
ca = trustme.CA()
# only common name, no subject alternative names
server_cert = ca.issue_cert(common_name=u"localhost")
with run_server_in_thread("https", "localhost", tmpdir, ca, server_cert) as cfg:
yield cfg
@pytest.fixture
def ip_san_server(tmp_path_factory):
tmpdir = tmp_path_factory.mktemp("certs")
ca = trustme.CA()
# IP address in Subject Alternative Name
server_cert = ca.issue_cert(u"127.0.0.1")
with run_server_in_thread("https", "127.0.0.1", tmpdir, ca, server_cert) as cfg:
yield cfg
@pytest.fixture
def ipv6_addr_server(tmp_path_factory):
if not HAS_IPV6:
pytest.skip("Only runs on IPv6 systems")
tmpdir = tmp_path_factory.mktemp("certs")
ca = trustme.CA()
# IP address in Common Name
server_cert = ca.issue_cert(common_name=u"::1")
with run_server_in_thread("https", "::1", tmpdir, ca, server_cert) as cfg:
yield cfg
@pytest.fixture
def ipv6_san_server(tmp_path_factory):
if not HAS_IPV6:
pytest.skip("Only runs on IPv6 systems")
tmpdir = tmp_path_factory.mktemp("certs")
ca = trustme.CA()
# IP address in Subject Alternative Name
server_cert = ca.issue_cert(u"::1")
with run_server_in_thread("https", "::1", tmpdir, ca, server_cert) as cfg:
yield cfg