Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fixes #86] Close the HTTP pool when service is stopped. #140

Merged
merged 18 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## See CONTRIBUTING.rst for more details.

## Contributor Checklist:

* [ ] Created a newsfragment in src/txacme/newsfragments/.
* [ ] Updated the automated tests.
* [ ] The changes pass minimal style checks.
58 changes: 58 additions & 0 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
Contributing to txacme
######################

We use `tox` to run the test in a controller environment.

Each change should consider covering the followings:

* Create a release notes fragment. See section below.
* Write automated tests to the point of having at least 100% code coverage.
* Documenting the API.
* Update the documentation with usage examples.


Documenting the changes
-----------------------

`towncrier <https://github.com/hawkowl/towncrier>`_
is used to manage the release notes.

Beside the normal docstring and API documentation,
each change which is visible to the users of txame should be documented in
the release notes.

To avoid merge conflict in the release notes files, each item of the release
notes is create in a separate file located in `src/txacme/newsfragments/`

The file will have the following format: ISSUE_ID.ITEM_TYPE.
`ISSUE_ID` is the GitHub Issue ID targeted by this branch.

`ITEM_TYPE` is one of the
`default types <https://github.com/hawkowl/towncrier#news-fragments>`_
supported by Towncrier.


Executing tests and checking coverage
-------------------------------------

You can run all tests in a specific environment, or just a single test::

$ tox -e py27-twlatest txacme.test.test_service
$ tox -e py27-twlatest \
txacme.test.test_service.AcmeIssuingServiceTests.test_timer_errors

You can check the test coverage, and diff coverage by running the dedicated
`coverage-report` tox env::

$ tox -e py27-twlatest,coverage-report

There is a tox environment dedicated to code style checks::

$ tox -e flake8

If executing the `tox` environment is too slow for you, you can always enable
a specific environment and execute the test with `trial`::

$ . .tox/py27-twlatest/bin/activate
$ pip install -e .
$ trial txacme.test.test_service.AcmeIssuingServiceTests.test_timer_errors
14 changes: 2 additions & 12 deletions src/integration/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,7 @@ class LetsEncryptStagingTLSSNI01Tests(ClientTestsMixin, TestCase):
ENDPOINT = ENDPOINT.encode('utf-8')

def _create_client(self, key):
return (
Client.from_url(reactor, LETSENCRYPT_STAGING_DIRECTORY, key=key)
.addCallback(tap(
lambda client: self.addCleanup(
client._client._treq._agent._pool.closeCachedConnections)))
)
return Client.from_url(reactor, LETSENCRYPT_STAGING_DIRECTORY, key=key)

def _create_responder(self):
action = start_action(action_type=u'integration:create_responder')
Expand Down Expand Up @@ -231,12 +226,7 @@ class LetsEncryptStagingLibcloudTests(ClientTestsMixin, TestCase):
skip = 'Must provide $ACME_HOST and $LIBCLOUD_*'

def _create_client(self, key):
return (
Client.from_url(reactor, LETSENCRYPT_STAGING_DIRECTORY, key=key)
.addCallback(tap(
lambda client: self.addCleanup(
client._client._treq._agent._pool.closeCachedConnections)))
)
return Client.from_url(reactor, LETSENCRYPT_STAGING_DIRECTORY, key=key)

def _create_responder(self):
with start_action(action_type=u'integration:create_responder'):
Expand Down
29 changes: 26 additions & 3 deletions src/txacme/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def _default_client(jws_client, reactor, key, alg):
if jws_client is None:
pool = HTTPConnectionPool(reactor)
agent = Agent(reactor, pool=pool)
jws_client = JWSClient(HTTPClient(agent=agent), key, alg)
jws_client = JWSClient(agent, key, alg)
return jws_client


Expand Down Expand Up @@ -137,6 +137,17 @@ def from_url(cls, reactor, url, key, alg=RS256, jws_client=None):
.addCallback(cls, reactor, key, jws_client)
.addActionFinish())

def stop(self):
"""
Stops the client operation.

This cancels pending operations and does cleanup.

:return: When operation is done.
:rtype: Deferred[None]
"""
return self._client.stop()

def register(self, new_reg=None):
"""
Create a new registration with the ACME server.
Expand Down Expand Up @@ -666,9 +677,10 @@ class JWSClient(object):
"""
timeout = 40

def __init__(self, treq_client, key, alg,
def __init__(self, agent, key, alg,
user_agent=u'txacme/{}'.format(__version__).encode('ascii')):
self._treq = treq_client
self._treq = HTTPClient(agent=agent)
self._agent = agent
self._key = key
self._alg = alg
self._user_agent = user_agent
Expand Down Expand Up @@ -771,6 +783,17 @@ def _send_request(self, method, url, *args, **kwargs):
b'content-type', [None])[0])))
.addActionFinish())

def stop(self):
"""
Stops the operation.

This cancels pending operations and does cleanup.
mithrandi marked this conversation as resolved.
Show resolved Hide resolved

:return: When operation is done.
:rtype: Deferred[None]
"""
return self._agent._pool.closeCachedConnections()
mithrandi marked this conversation as resolved.
Show resolved Hide resolved

def head(self, url, *args, **kwargs):
"""
Send HEAD request without checking the response.
Expand Down
2 changes: 2 additions & 0 deletions src/txacme/newsfragments/86.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
txacme.service.AcmeIssuingService.stopFactory nows closes the persisted
HTTP client connections.
46 changes: 23 additions & 23 deletions src/txacme/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ class AcmeIssuingService(Service):
:param cert_store: The certificate store containing the certificates to
manage.

:type client_creator: Callable[[], Deferred[`txacme.client.Client`]]
:param client_creator: A callable called with no arguments for creating the
ACME client. For example, ``partial(Client.from_url, reactor=reactor,
:type client: `txacme.client.Client`
:param client: A client which is already set to be used for an
environement. For example, ``Client.from_url(reactor=reactor,
mithrandi marked this conversation as resolved.
Show resolved Hide resolved
url=LETSENCRYPT_STAGING_DIRECTORY, key=acme_key, alg=RS256)``.
When the service is stopped, it will automatically call the stop
method on the client.

mithrandi marked this conversation as resolved.
Show resolved Hide resolved
:param clock: ``IReactorTime`` provider; usually the reactor, when not
testing.

Expand All @@ -65,7 +68,7 @@ class AcmeIssuingService(Service):
key generation requirements.
"""
cert_store = attr.ib()
_client_creator = attr.ib()
_client = attr.ib()
_clock = attr.ib()
_responders = attr.ib()
_email = attr.ib(default=None)
Expand Down Expand Up @@ -115,7 +118,7 @@ def check(certs):

d1 = (
gatherResults(
[self._with_client(self._issue_cert, server_name)
[self._issue_cert(server_name)
.addErrback(self._panic, server_name)
for server_name in panicing],
consumeErrors=True)
Expand Down Expand Up @@ -169,21 +172,15 @@ def finish(result):
d_issue, waiting = self._issuing[server_name]
waiting.append(d)
else:
d_issue = self._with_client(self._issue_cert, server_name)
d_issue = self._issue_cert(server_name)
waiting = [d]
self._issuing[server_name] = (d_issue, waiting)
# Add the callback afterwards in case we're using a client
# implementation that isn't actually async
d_issue.addBoth(finish)
return d

def _with_client(self, f, *a, **kw):
"""
Construct a client, and perform an operation with it.
"""
return self._client_creator().addCallback(f, *a, **kw)

def _issue_cert(self, client, server_name):
def _issue_cert(self, server_name):
"""
Issue a new cert for a particular name.
"""
Expand All @@ -200,10 +197,10 @@ def _issue_cert(self, client, server_name):
def answer_and_poll(authzr):
def got_challenge(stop_responding):
return (
poll_until_valid(authzr, self._clock, client)
poll_until_valid(authzr, self._clock, self._client)
.addBoth(tap(lambda _: stop_responding())))
return (
answer_challenge(authzr, client, self._responders)
answer_challenge(authzr, self._client, self._responders)
.addCallback(got_challenge))

def got_cert(certr):
Expand All @@ -223,13 +220,13 @@ def got_chain(chain):
return objects

return (
client.request_challenges(fqdn_identifier(server_name))
self._client.request_challenges(fqdn_identifier(server_name))
.addCallback(answer_and_poll)
.addCallback(lambda ign: client.request_issuance(
.addCallback(lambda ign: self._client.request_issuance(
CertificateRequest(
csr=csr_for_names([server_name], key))))
.addCallback(got_cert)
.addCallback(client.fetch_chain)
.addCallback(self._client.fetch_chain)
.addCallback(got_chain)
.addCallback(partial(self.cert_store.store, server_name)))

Expand All @@ -240,9 +237,9 @@ def _ensure_registered(self):
if self._registered:
return succeed(None)
else:
return self._with_client(self._register)
return self._register()

def _register(self, client):
def _register(self):
"""
Register and agree to the TOS.
"""
Expand All @@ -251,8 +248,8 @@ def _registered(regr):
self._registered = True
regr = messages.NewRegistration.from_data(email=self._email)
return (
client.register(regr)
.addCallback(client.agree_to_tos)
self._client.register(regr)
.addCallback(self._client.agree_to_tos)
.addCallback(_registered))

def when_certs_valid(self):
Expand Down Expand Up @@ -291,7 +288,10 @@ def stopService(self):
for d in list(self._waiting):
d.cancel()
self._waiting = []
return self._timer_service.stopService()

deferred = self._client.stop()
deferred.addCallback(lambda _: self._timer_service.stopService())
return deferred


__all__ = ['AcmeIssuingService']
2 changes: 1 addition & 1 deletion src/txacme/test/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def dns_names():

def urls():
"""
Strategy for generating ``twisted.python.url.URL``\s.
Strategy for generating ``twisted.python.url.URL``.
"""
return s.builds(
URL,
Expand Down
24 changes: 22 additions & 2 deletions src/txacme/test/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ class FailingClient(object):
Test client that always fails.
"""
def __getattr__(self, name):
return lambda *a, **kw: fail(RuntimeError('Tried to do something'))
return lambda *a, **kw: fail(
RuntimeError('Failing at "%s".' % (name,)))


class AcmeFixture(Fixture):
Expand Down Expand Up @@ -331,11 +332,30 @@ def test_timer_errors(self):
error should be caught and logged.
"""
with AcmeFixture(client=FailingClient()) as fixture:
# Registration is triggered with service starts.
fixture.service.startService()
latest_logs = flush_logged_errors()
self.assertThat(latest_logs, HasLength(1))
self.assertThat(
str(latest_logs[0]), Contains('Failing at "register".'))

# Forcing a check will trigger again the registration.
self.assertThat(
fixture.service._check_certs(),
succeeded(Always()))
self.assertThat(flush_logged_errors(), HasLength(2))

latest_logs = flush_logged_errors()
self.assertThat(latest_logs, HasLength(1))
self.assertThat(
str(latest_logs[0]), Contains('Failing at "register".'))

# Manually stop the service to not stop it from the fixture
# and trigger another failure.
self.assertThat(
fixture.service.stopService(),
failed(AfterPreprocessing(
lambda f: f.value.args[0], Equals('Failing at "stop".'))))
latest_logs = flush_logged_errors()

def test_starting_stopping_cancellation(self):
"""
Expand Down
7 changes: 7 additions & 0 deletions src/txacme/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ def _generate_ca_cert(self):
self._ca_aki = x509.AuthorityKeyIdentifier.from_issuer_public_key(
self._ca_key.public_key())

def stop(self):
"""
Called to stop the client and trigger cleanups.
"""
# Nothing to stop as reactor is not spun.
return succeed(None)

def register(self, new_reg=None):
self._registered = True
if new_reg is None:
Expand Down
6 changes: 5 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ skip_install = true
commands = coverage erase

[testenv:coverage-report]
deps = coverage
deps =
coverage
diff_cover
skip_install = true
commands =
coverage combine
coverage report
coverage xml -o {envtmpdir}/coverage.xml
diff-cover {envtmpdir}/coverage.xml

[testenv:docs]
whitelist_externals =
Expand Down