Skip to content

Commit

Permalink
Merge pull request #140 from twisted/86-service-stop
Browse files Browse the repository at this point in the history
[Fixes #86] Close the HTTP pool when service is stopped.
  • Loading branch information
mithrandi committed Feb 14, 2020
2 parents 21e864f + b025cde commit 9f60aae
Show file tree
Hide file tree
Showing 13 changed files with 244 additions and 72 deletions.
7 changes: 7 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## See CONTRIBUTING.rst for more details.

## Contributor Checklist:

* [ ] Created a newsfragment in src/txacme/newsfragments/.
* [ ] Updated the automated tests.
* [ ] The changes pass minimal style checks.
62 changes: 62 additions & 0 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
Contributing to txacme
######################

We use `tox` to run the test in a controller environment.

Each change should consider covering the followings:

* Create a release notes fragment. See section below.
* Write automated tests to the point of having at least 100% code coverage.
* Documenting the API.
* Update the documentation with usage examples.


Documenting the changes
-----------------------

`towncrier <https://github.com/hawkowl/towncrier>`_
is used to manage the release notes.

Beside the normal docstring and API documentation,
each change which is visible to the users of txame should be documented in
the release notes.

To avoid merge conflict in the release notes files, each item of the release
notes is create in a separate file located in `src/txacme/newsfragments/`

The file will have the following format: ISSUE_ID.ITEM_TYPE.
`ISSUE_ID` is the GitHub Issue ID targeted by this branch.

`ITEM_TYPE` is one of the
`default types <https://github.com/hawkowl/towncrier#news-fragments>`_
supported by Towncrier.


Executing tests and checking coverage
-------------------------------------

You can run all tests in a specific environment, or just a single test::

$ tox -e py27-twlatest txacme.test.test_service
$ tox -e py27-twlatest \
txacme.test.test_service.AcmeIssuingServiceTests.test_timer_errors

You can check the test coverage, and diff coverage by running the dedicated
`coverage-report` tox env::

$ tox -e py27-twlatest,coverage-report

There is a tox environment dedicated to code style checks::

$ tox -e flake8

and another one for documentation and API checks::

$ tox -e docs

If executing the `tox` environment is too slow for you, you can always enable
a specific environment and execute the test with `trial`::

$ . .tox/py27-twlatest/bin/activate
$ pip install -e .
$ trial txacme.test.test_service.AcmeIssuingServiceTests.test_timer_errors
14 changes: 2 additions & 12 deletions src/integration/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,7 @@ class LetsEncryptStagingTLSSNI01Tests(ClientTestsMixin, TestCase):
ENDPOINT = ENDPOINT.encode('utf-8')

def _create_client(self, key):
return (
Client.from_url(reactor, LETSENCRYPT_STAGING_DIRECTORY, key=key)
.addCallback(tap(
lambda client: self.addCleanup(
client._client._treq._agent._pool.closeCachedConnections)))
)
return Client.from_url(reactor, LETSENCRYPT_STAGING_DIRECTORY, key=key)

def _create_responder(self):
action = start_action(action_type=u'integration:create_responder')
Expand Down Expand Up @@ -231,12 +226,7 @@ class LetsEncryptStagingLibcloudTests(ClientTestsMixin, TestCase):
skip = 'Must provide $ACME_HOST and $LIBCLOUD_*'

def _create_client(self, key):
return (
Client.from_url(reactor, LETSENCRYPT_STAGING_DIRECTORY, key=key)
.addCallback(tap(
lambda client: self.addCleanup(
client._client._treq._agent._pool.closeCachedConnections)))
)
return Client.from_url(reactor, LETSENCRYPT_STAGING_DIRECTORY, key=key)

def _create_responder(self):
with start_action(action_type=u'integration:create_responder'):
Expand Down
51 changes: 46 additions & 5 deletions src/txacme/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _default_client(jws_client, reactor, key, alg):
if jws_client is None:
pool = HTTPConnectionPool(reactor)
agent = Agent(reactor, pool=pool)
jws_client = JWSClient(HTTPClient(agent=agent), key, alg)
jws_client = JWSClient(agent, key, alg)
return jws_client


Expand Down Expand Up @@ -116,6 +116,8 @@ def from_url(
"""
Construct a client from an ACME directory at a given URL.
At construct time, it validates the ACME directory.
:param url: The ``twisted.python.url.URL`` to fetch the directory from.
See `txacme.urls` for constants for various well-known public
directories.
Expand Down Expand Up @@ -146,6 +148,16 @@ def from_url(
.addCallback(cls, reactor, key, jws_client)
.addActionFinish())

def stop(self):
"""
Stops the client operation.
This cancels pending operations and does cleanup.
:return: A deferred which files when the client is stopped.
"""
return self._client.stop()

def register(self, new_reg=None):
"""
Create a new registration with the ACME server.
Expand Down Expand Up @@ -675,9 +687,11 @@ class JWSClient(object):
"""
timeout = _DEFAULT_TIMEOUT

def __init__(self, treq_client, key, alg,
def __init__(self, agent, key, alg,
user_agent=u'txacme/{}'.format(__version__).encode('ascii')):
self._treq = treq_client
self._treq = HTTPClient(agent=agent)
self._agent = agent
self._current_request = None
self._key = key
self._alg = alg
self._user_agent = user_agent
Expand Down Expand Up @@ -765,21 +779,48 @@ def _send_request(self, method, url, *args, **kwargs):
:return: Deferred firing with the HTTP response.
"""
def cb_request_done(result):
"""
Called when we got a response from the request.
"""
self._current_request = None
return result

action = LOG_JWS_REQUEST(url=url)
with action.context():
headers = kwargs.setdefault('headers', Headers())
headers.setRawHeaders(b'user-agent', [self._user_agent])
kwargs.setdefault('timeout', self.timeout)
self._current_request = self._treq.request(
method, url, *args, **kwargs)
return (
DeferredContext(
self._treq.request(method, url, *args, **kwargs))
DeferredContext(self._current_request)
.addCallback(cb_request_done)
.addCallback(
tap(lambda r: action.add_success_fields(
code=r.code,
content_type=r.headers.getRawHeaders(
b'content-type', [None])[0])))
.addActionFinish())

def stop(self):
"""
Stops the operation.
This cancels pending operations and does cleanup.
:return: A deferred which fires when the client is stopped.
"""
if self._current_request and not self._current_request.called:
self._current_request.addErrback(lambda _: None)
self._current_request.cancel()
self._current_request = None

agent_pool = getattr(self._agent, '_pool', None)
if agent_pool:
return agent_pool.closeCachedConnections()
return succeed(None)

def head(self, url, *args, **kwargs):
"""
Send HEAD request without checking the response.
Expand Down
9 changes: 4 additions & 5 deletions src/txacme/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class AutoTLSEndpoint(object):
reactor = attr.ib()
directory = attr.ib(
validator=lambda inst, a, value: check_directory_url_type(value))
client_creator = attr.ib()
client = attr.ib()
cert_store = attr.ib()
cert_mapping = attr.ib()
sub_endpoint = attr.ib()
Expand All @@ -101,8 +101,7 @@ def listen(self, protocolFactory): # noqa
def _got_port(port):
self.service = AcmeIssuingService(
cert_store=self.cert_store,
client_creator=partial(
self.client_creator, self.reactor, self.directory),
client=self.client,
clock=self.reactor,
responders=[responder],
check_interval=self.check_interval,
Expand Down Expand Up @@ -174,8 +173,8 @@ def colon_join(items):
return AutoTLSEndpoint(
reactor=reactor,
directory=directory,
client_creator=partial(
Client.from_url, key=acme_key, alg=RS256, timeout=timeout),
client=Client.from_url(
reactor, directory, key=acme_key, alg=RS256, timeout=timeout),
cert_store=DirectoryStore(pem_path),
cert_mapping=HostDirectoryMap(pem_path),
sub_endpoint=serverFromString(reactor, sub))
Expand Down
4 changes: 4 additions & 0 deletions src/txacme/newsfragments/86.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
INCOMPATIBLE CHANGE: txacme.service.AcmeIssuingService.stopFactory nows
closes the persisted HTTP client connections.
This is done to bring the in a state similar to the one before calling
startFactory.
4 changes: 4 additions & 0 deletions src/txacme/newsfragments/86.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
INCOMPATIBLE CHANGE: txacme.client.JWSClient is now initialized with an
twisted.web.client.Agent instead of treq.client.HTTPClient.
In this way the usage of Treq is internal to txacme.
It was changed to make it easier to close the idle persistent connection.
46 changes: 23 additions & 23 deletions src/txacme/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ class AcmeIssuingService(Service):
:param cert_store: The certificate store containing the certificates to
manage.
:type client_creator: Callable[[], Deferred[`txacme.client.Client`]]
:param client_creator: A callable called with no arguments for creating the
ACME client. For example, ``partial(Client.from_url, reactor=reactor,
:type client: `txacme.client.Client`
:param client: A client which is already set to be used for an
environment. For example, ``Client.from_url(reactor=reactor,
url=LETSENCRYPT_STAGING_DIRECTORY, key=acme_key, alg=RS256)``.
When the service is stopped, it will automatically call the stop
method on the client.
:param clock: ``IReactorTime`` provider; usually the reactor, when not
testing.
Expand All @@ -65,7 +68,7 @@ class AcmeIssuingService(Service):
key generation requirements.
"""
cert_store = attr.ib()
_client_creator = attr.ib()
_client = attr.ib()
_clock = attr.ib()
_responders = attr.ib()
_email = attr.ib(default=None)
Expand Down Expand Up @@ -115,7 +118,7 @@ def check(certs):

d1 = (
gatherResults(
[self._with_client(self._issue_cert, server_name)
[self._issue_cert(server_name)
.addErrback(self._panic, server_name)
for server_name in panicing],
consumeErrors=True)
Expand Down Expand Up @@ -169,21 +172,15 @@ def finish(result):
d_issue, waiting = self._issuing[server_name]
waiting.append(d)
else:
d_issue = self._with_client(self._issue_cert, server_name)
d_issue = self._issue_cert(server_name)
waiting = [d]
self._issuing[server_name] = (d_issue, waiting)
# Add the callback afterwards in case we're using a client
# implementation that isn't actually async
d_issue.addBoth(finish)
return d

def _with_client(self, f, *a, **kw):
"""
Construct a client, and perform an operation with it.
"""
return self._client_creator().addCallback(f, *a, **kw)

def _issue_cert(self, client, server_name):
def _issue_cert(self, server_name):
"""
Issue a new cert for a particular name.
"""
Expand All @@ -200,10 +197,10 @@ def _issue_cert(self, client, server_name):
def answer_and_poll(authzr):
def got_challenge(stop_responding):
return (
poll_until_valid(authzr, self._clock, client)
poll_until_valid(authzr, self._clock, self._client)
.addBoth(tap(lambda _: stop_responding())))
return (
answer_challenge(authzr, client, self._responders)
answer_challenge(authzr, self._client, self._responders)
.addCallback(got_challenge))

def got_cert(certr):
Expand All @@ -223,13 +220,13 @@ def got_chain(chain):
return objects

return (
client.request_challenges(fqdn_identifier(server_name))
self._client.request_challenges(fqdn_identifier(server_name))
.addCallback(answer_and_poll)
.addCallback(lambda ign: client.request_issuance(
.addCallback(lambda ign: self._client.request_issuance(
CertificateRequest(
csr=csr_for_names([server_name], key))))
.addCallback(got_cert)
.addCallback(client.fetch_chain)
.addCallback(self._client.fetch_chain)
.addCallback(got_chain)
.addCallback(partial(self.cert_store.store, server_name)))

Expand All @@ -240,9 +237,9 @@ def _ensure_registered(self):
if self._registered:
return succeed(None)
else:
return self._with_client(self._register)
return self._register()

def _register(self, client):
def _register(self):
"""
Register and agree to the TOS.
"""
Expand All @@ -251,8 +248,8 @@ def _registered(regr):
self._registered = True
regr = messages.NewRegistration.from_data(email=self._email)
return (
client.register(regr)
.addCallback(client.agree_to_tos)
self._client.register(regr)
.addCallback(self._client.agree_to_tos)
.addCallback(_registered))

def when_certs_valid(self):
Expand Down Expand Up @@ -291,7 +288,10 @@ def stopService(self):
for d in list(self._waiting):
d.cancel()
self._waiting = []
return self._timer_service.stopService()

deferred = self._client.stop()
deferred.addCallback(lambda _: self._timer_service.stopService())
return deferred


__all__ = ['AcmeIssuingService']

0 comments on commit 9f60aae

Please sign in to comment.