New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add support for password protected certificate files #143
Changes from 9 commits
4ec225f
292af5e
6a59d60
14d8af2
9f936d7
92c567b
f9fd2b7
01955b8
cb80a85
d1ded21
3682f73
2f0dbda
d5fa938
de8b785
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,12 +8,14 @@ | |
import gzip | ||
import hashlib | ||
import httplib2 | ||
from http.server import BaseHTTPRequestHandler, HTTPServer | ||
import os | ||
import random | ||
import re | ||
import shutil | ||
import six | ||
import socket | ||
import ssl | ||
import struct | ||
import sys | ||
import threading | ||
|
@@ -23,6 +25,12 @@ | |
from six.moves import http_client, queue | ||
|
||
|
||
SERVER_CERTFILE = 'tests/testdata/test_server_cert.pem' | ||
CLIENT_CERTFILE = 'tests/testdata/test_cert.pem' | ||
CLIENT_CERT_PASSWORD = '12345' | ||
CLIENT_CERT_SERIAL = '5ECC68A6F89CAA16D032C838CCDDC7E577264CDB' | ||
|
||
|
||
@contextlib.contextmanager | ||
def assert_raises(exc_type): | ||
def _name(t): | ||
|
@@ -260,6 +268,61 @@ def getresponse(self): | |
raise http_client.BadStatusLine("") | ||
|
||
|
||
def _get_free_port(): | ||
s = socket.socket(socket.AF_INET, type=socket.SOCK_STREAM) | ||
s.bind(('localhost', 0)) | ||
address, port = s.getsockname() | ||
s.close() | ||
return port | ||
|
||
|
||
class _MockServerRequestHandler(BaseHTTPRequestHandler): | ||
"""Server request handler which always returns 200 and saves client cert info.""" | ||
def do_GET(self): | ||
# save client cert | ||
self.server.last_client_cert = self.connection.getpeercert() | ||
# Process an HTTP GET request and return a response with an HTTP 200 status. | ||
self.send_response(200) | ||
self.end_headers() | ||
return | ||
|
||
|
||
class MockHttpServer(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There already exists code that does same function, right below this class. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I respectfully disagree with that statement. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, thanks for criticism, I very much agree that code turned out less simple than it should. So if you disagree here, please wait for me to add https/cert support in existing server stub. There is no good reason to have two testing server implementations. |
||
"""This creates local http server in a separate thread.""" | ||
def __init__(self, handler=None, port=0, use_ssl=False): | ||
self.handler = handler if handler else _MockServerRequestHandler | ||
self.port = port if port else _get_free_port() | ||
self.use_ssl = use_ssl | ||
self.client_certfile = CLIENT_CERTFILE | ||
self.certfile = SERVER_CERTFILE | ||
|
||
def __enter__(self): | ||
self.server = HTTPServer(('localhost', self.port), self.handler) | ||
|
||
# wrap socket when SSL server requested | ||
if self.use_ssl: | ||
context = ssl.SSLContext(ssl.PROTOCOL_TLS) | ||
# ask client to present own cert for mutual auth | ||
context.verify_mode = ssl.CERT_OPTIONAL | ||
if self.client_certfile: | ||
# avoid verification failure by preloading matching client cert | ||
context.load_verify_locations(self.client_certfile) | ||
# load server cert | ||
context.load_cert_chain(self.certfile) | ||
self.server.socket = context.wrap_socket( | ||
sock=self.server.socket, server_side=True) | ||
|
||
# Start running mock server in a separate thread. | ||
# Daemon threads automatically shut down when the main process exits. | ||
server_thread = threading.Thread(target=self.server.serve_forever) | ||
server_thread.setDaemon(True) | ||
server_thread.start() | ||
return self | ||
|
||
def __exit__(self, type, value, traceback): | ||
self.server.shutdown() | ||
|
||
|
||
@contextlib.contextmanager | ||
def server_socket(fun, request_count=1, timeout=5): | ||
gresult = [None] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,6 +62,66 @@ def test_get_via_https_key_cert(): | |
pass | ||
|
||
|
||
def test_get_via_https_key_cert_password(): | ||
# At this point I can only test | ||
# that the key and cert files are passed in | ||
# correctly to httplib. It would be nice to have | ||
# a real https endpoint to test against. | ||
http = httplib2.Http(timeout=2) | ||
http.add_certificate("akeyfile", "acertfile", "bitworking.org", "apassword") | ||
try: | ||
http.request("https://bitworking.org", "GET") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still trying to hit external website. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They are all transient, were created because local server stub was not implemented or not capable of required features at that time. See file docstring:
|
||
except AttributeError: | ||
assert http.connections["https:bitworking.org"].key_file == "akeyfile" | ||
assert http.connections["https:bitworking.org"].cert_file == "acertfile" | ||
assert http.connections["https:bitworking.org"].key_password == "apassword" | ||
except IOError: | ||
# Skip on 3.2 | ||
pass | ||
|
||
try: | ||
http.request("https://notthere.bitworking.org", "GET") | ||
except httplib2.ServerNotFoundError: | ||
assert http.connections["https:notthere.bitworking.org"].key_file is None | ||
assert http.connections["https:notthere.bitworking.org"].cert_file is None | ||
assert http.connections["https:notthere.bitworking.org"].key_password is None | ||
except IOError: | ||
# Skip on 3.2 | ||
pass | ||
|
||
|
||
def test_get_via_https_key_cert_password_with_pem(): | ||
# At this point I can only test | ||
# that the key and cert files are passed in | ||
# correctly to httplib. It would be nice to have | ||
# a real https endpoint to test against. | ||
http = httplib2.Http(timeout=2) | ||
http.add_certificate(tests.CLIENT_CERTFILE, tests.CLIENT_CERTFILE, | ||
"bitworking.org", tests.CLIENT_CERT_PASSWORD) | ||
http.request("https://bitworking.org", "GET") | ||
|
||
# try invalid password | ||
http = httplib2.Http(timeout=2) | ||
http.add_certificate(tests.CLIENT_CERTFILE, tests.CLIENT_CERTFILE, | ||
"bitworking.org", "invalid") | ||
with tests.assert_raises(ssl.SSLError): | ||
http.request("https://bitworking.org", "GET") | ||
|
||
|
||
def test_get_via_https_key_cert_password_with_pem_local_server(): | ||
with tests.MockHttpServer(use_ssl=True) as server: | ||
# load matching server cert to avoid verification failure | ||
http = httplib2.Http(ca_certs=server.certfile) | ||
# load client cert to be presented when server asks for it | ||
http.add_certificate(tests.CLIENT_CERTFILE, tests.CLIENT_CERTFILE, | ||
'', tests.CLIENT_CERT_PASSWORD) | ||
url = 'https://localhost:{port}/'.format(port=server.port) | ||
response, content = http.request(url, "GET") | ||
assert response.status == 200 | ||
# verify that client cert was presented with matching serial number | ||
assert server.server.last_client_cert['serialNumber'] == tests.CLIENT_CERT_SERIAL | ||
|
||
|
||
def test_ssl_invalid_ca_certs_path(): | ||
# Test that we get an ssl.SSLError when specifying a non-existent CA | ||
# certs file. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just this minor spaces around '=', CI linter should've caught this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done