Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Twisted concurrency model #816

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions coverage/cmdline.py
Expand Up @@ -37,6 +37,7 @@ class Opts(object):
)
CONCURRENCY_CHOICES = [
"thread", "gevent", "greenlet", "eventlet", "multiprocessing",
"twisted",
]
concurrency = optparse.make_option(
'', '--concurrency', action='store', metavar="LIB",
Expand Down Expand Up @@ -635,16 +636,16 @@ def do_run(self, options, args):
show_help("Can't append to data files in parallel mode.")
return ERR

if options.concurrency == "multiprocessing":
if options.concurrency in ("multiprocessing", "twisted"):
# Can't set other run-affecting command line options with
# multiprocessing.
# multiprocessing or Twisted.
for opt_name in ['branch', 'include', 'omit', 'pylib', 'source', 'timid']:
# As it happens, all of these options have no default, meaning
# they will be None if they have not been specified.
if getattr(options, opt_name) is not None:
show_help(
"Options affecting multiprocessing must only be specified "
"in a configuration file.\n"
"Options affecting multiprocessing and twisted must "
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Will probably not be very useful reviewing much of this, but it'd be a bit nicer UX-wise if rather than saying "A or B" here you used .format and inserted in options.concurrency, i.e. the actual one of the two of those that the user really did use, instead of including ones they didn't)

"only be specified in a configuration file.\n"
"Remove --{} from the command line.".format(opt_name)
)
return ERR
Expand Down
38 changes: 32 additions & 6 deletions coverage/control.py
Expand Up @@ -37,6 +37,11 @@
# Jython has no multiprocessing module.
patch_multiprocessing = None

try:
from coverage.twistedproc import patch_twisted
except ImportError:
patch_twisted = None

os = isolate_module(os)


Expand Down Expand Up @@ -110,8 +115,8 @@ def __init__(
`concurrency` is a string indicating the concurrency library being used
in the measured code. Without this, coverage.py will get incorrect
results if these libraries are in use. Valid strings are "greenlet",
"eventlet", "gevent", "multiprocessing", or "thread" (the default).
This can also be a list of these strings.
"eventlet", "gevent", "multiprocessing", "twisted", or "thread" (the
default). This can also be a list of these strings.

If `check_preimported` is true, then when coverage is started, the
aleady-imported files will be checked to see if they should be measured
Expand Down Expand Up @@ -335,10 +340,9 @@ def load(self):
if not should_skip:
self._data.read()

def _init_for_start(self):
"""Initialization for start()"""
# Construct the collector.
concurrency = self.config.concurrency or []
def _init_multiprocessing(self, concurrency):
"""Enable collection in multiprocessing workers, if requested.
"""
if "multiprocessing" in concurrency:
if not patch_multiprocessing:
raise CoverageException( # pragma: only jython
Expand All @@ -349,6 +353,28 @@ def _init_for_start(self):
# it for the main process.
self.config.parallel = True

def _init_twisted(self, concurrency):
"""Enable collection in Twisted-spawned children, if requested.
"""
if "twisted" in concurrency:
if patch_twisted is None:
# XXX Untested
raise CoverageException(
"twisted is not supported on this Python"
)
patch_twisted(
rcfile=self.config.config_file,
)
# XXX Untested
self.config.parallel = True

def _init_for_start(self):
"""Initialization for start()"""
# Construct the collector.
concurrency = self.config.concurrency or []
self._init_multiprocessing(concurrency)
self._init_twisted(concurrency)

dycon = self.config.dynamic_context
if not dycon or dycon == "none":
context_switchers = []
Expand Down
85 changes: 85 additions & 0 deletions coverage/twistedproc.py
@@ -0,0 +1,85 @@
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt

"""Monkey-patching to add Twisted support for coverage.py
"""

from functools import partial
from tempfile import mkdtemp
from os.path import join

# XXX do it without installing the default reactor
import twisted.internet.reactor

from coverage.misc import contract

# An attribute that will be set on the module to indicate that it has been
# monkey-patched. Value copied from multiproc.py.
PATCHED_MARKER = "_coverage$patched"

@contract(rcfile=str)
def patch_twisted(rcfile):
"""
The twisted.internet.interfaces.IReactorProcess.spawnProcess
implementation of the Twisted reactor is patched to enable coverage
collection in spawned processes.

This works by clobbering sitecustomize.
"""
if getattr(twisted.internet, PATCHED_MARKER, False):
return

origSpawnProcess = twisted.internet.reactor.spawnProcess
twisted.internet.reactor.spawnProcess = partial(
_coverageSpawnProcess,
origSpawnProcess,
rcfile,
)
setattr(twisted.internet, PATCHED_MARKER, True)


def _coverageSpawnProcess(
origSpawnProcess,
rcfile,
processProtocol,
executable,
args=(),
env=None,
*a,
**kw
):
"""
Spawn a process using ``origSpawnProcess``. Set up its environment so
that coverage its collected, if it is a Python process.
"""
if env is None:
env = os.environ.copy()
pythonpath = env.get(u"PYTHONPATH", u"").split(u":")
dtemp = mkdtemp()
pythonpath.insert(0, dtemp)
sitecustomize = join(dtemp, u"sitecustomize.py")
with open(sitecustomize, "wt") as f:
f.write("""\
import sys, os.path
sys.path.remove({dtemp!r})
os.remove({sitecustomize!r})
if os.path.exists({sitecustomizec!r}):
os.remove({sitecustomizec!r})
os.rmdir({dtemp!r})
import coverage
coverage.process_startup()
""".format(
sitecustomize=sitecustomize,
sitecustomizec=sitecustomize + u"c",
dtemp=dtemp,
))
env[u"PYTHONPATH"] = u":".join(pythonpath)
env[u"COVERAGE_PROCESS_START"] = rcfile
return origSpawnProcess(
processProtocol,
executable,
args,
env,
*a,
**kw
)
79 changes: 79 additions & 0 deletions tests/test_concurrency.py
Expand Up @@ -6,6 +6,7 @@
import os
import random
import sys
import textwrap
import threading
import time

Expand Down Expand Up @@ -42,6 +43,10 @@
except ImportError: # pragma: only jython
greenlet = None

try:
import twisted.internet.reactor
except ImportError:
twisted = None

def measurable_line(l):
"""Is this a line of code coverage will measure?
Expand Down Expand Up @@ -462,6 +467,80 @@ def test_multiprocessing_with_branching(self):
self.try_multiprocessing_code_with_branching(code, expected_out)


TWISTED_CODE = """
# Above this will be a definition of work().
from sys import executable, argv, stdout
from twisted.internet.task import react
from twisted.internet.utils import getProcessOutput
from twisted.internet.defer import inlineCallbacks

@inlineCallbacks
def master(reactor, *xs):
for x in xs:
y = yield getProcessOutput(executable, [__file__, "worker", x])
stdout.write(y)

def worker(x):
print("work(%s) = %s" % (x, work(int(x))))

if __name__ == '__main__':
if argv[1] == "master":
react(master, argv[2:])
else:
worker(*argv[2:])
"""

class TwistedTest(CoverageTest):
"""Test support of Twisted's multi-process feature."""

keep_temp_dir = True

def setUp(self):
if twisted is None:
self.skipTest("No Twisted in this Python")
super(TwistedTest, self).setUp()

def test_twisted(self):
"""
With the Twisted concurrency model enabled, coverage is collected from
child Python processes that are started with
IReactorProcess.spawnProcess.
"""
code = SQUARE_OR_CUBE_WORK + TWISTED_CODE
source_file = "spawnprocess.py"
self.make_file(source_file, textwrap.dedent(code))
self.make_file(".coveragerc", """\
[run]
concurrency = twisted
source = .
""")

x = 41
y = x + 1
command = (
"coverage run --concurrency=twisted "
"spawnprocess.py master %s %s" % (x, y)
)
out = self.run_command(command)
expected_out = (
"work(%s) = %s\n"
"work(%s) = %s\n"
) % (
x, x ** 2,
y, y ** 3,
)

# Make sure it actually ran the code we were trying to make it run.
self.assertEqual(out, expected_out)

# Make sure the coverage collected on that run is what we expect, too.
out = self.run_command("coverage combine")
self.assertEqual(out, "")
out = self.run_command("coverage report -m")
last_line = self.squeezed_lines(out)[-1]
self.assertRegex(last_line, r"%s \d+ 0 100%%" % (source_file,))


def test_coverage_stop_in_threads():
has_started_coverage = []
has_stopped_coverage = []
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Expand Up @@ -15,6 +15,7 @@ deps =
-r requirements/pytest.pip
pip==19.1.1
setuptools==41.0.1
twisted==19.2.1
# gevent 1.3 causes a failure: https://github.com/nedbat/coveragepy/issues/663
py{27,35,36}: gevent==1.2.2
py{27,35,36,37,38}: eventlet==0.24.1
Expand Down