diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 494aab710e7..d7a602d5c55 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,9 +1,4 @@ repos: - - repo: https://github.com/asottile/pyupgrade - rev: v2.23.1 - hooks: - - id: pyupgrade - args: ["--py36-plus"] - repo: https://github.com/pycqa/isort rev: 5.9.3 diff --git a/src/twisted/internet/_glibbase.py b/src/twisted/internet/_glibbase.py index 4fc5271e75b..a052e1e67db 100644 --- a/src/twisted/internet/_glibbase.py +++ b/src/twisted/internet/_glibbase.py @@ -19,6 +19,7 @@ from twisted.internet import base, posixbase, selectreactor from twisted.internet.interfaces import IReactorFDSet from twisted.python import log +from ._signals import _UnixWaker def ensureNotImported(moduleNames, errorMessage, preventImports=[]): @@ -48,13 +49,13 @@ def ensureNotImported(moduleNames, errorMessage, preventImports=[]): sys.modules[name] = None -class GlibWaker(posixbase._UnixWaker): +class GlibWaker(_UnixWaker): """ Run scheduled events after waking up. """ - def doRead(self): - posixbase._UnixWaker.doRead(self) + def doRead(self) -> None: + super().doRead() self.reactor._simulate() diff --git a/src/twisted/internet/_signals.py b/src/twisted/internet/_signals.py index 2ecc046b8c7..6c151decf6b 100644 --- a/src/twisted/internet/_signals.py +++ b/src/twisted/internet/_signals.py @@ -33,7 +33,19 @@ """ +import contextlib +import errno +import os import signal +import socket + +from zope.interface import Attribute, Interface, implementer + +from twisted.python import failure, log, util +from twisted.python.runtime import platformType + +if platformType == "posix": + from . import fdesc, process def installHandler(fd): @@ -66,3 +78,195 @@ def isDefaultHandler(): Determine whether the I{SIGCHLD} handler is the default or not. """ return signal.getsignal(signal.SIGCHLD) == signal.SIG_DFL + + +class _IWaker(Interface): + """ + Interface to wake up the event loop based on the self-pipe trick. + + The U{I{self-pipe trick}}, used to wake + up the main loop from another thread or a signal handler. + This is why we have wakeUp together with doRead + + This is used by threads or signals to wake up the event loop. + """ + + disconnected = Attribute("") + + def wakeUp(): + """ + Called when the event should be wake up. + """ + + def doRead(): + """ + Read some data from my connection and discard it. + """ + + def connectionLost(reason: failure.Failure) -> None: + """ + Called when connection was closed and the pipes. + """ + + +@implementer(_IWaker) +class _SocketWaker(log.Logger): + """ + The I{self-pipe trick}, implemented + using a pair of sockets rather than pipes (due to the lack of support in + select() on Windows for pipes), used to wake up the main loop from + another thread. + """ + + disconnected = 0 + + def __init__(self, reactor): + """Initialize.""" + self.reactor = reactor + # Following select_trigger (from asyncore)'s example; + client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + with contextlib.closing( + socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ) as server: + server.bind(("127.0.0.1", 0)) + server.listen(1) + client.connect(server.getsockname()) + reader, clientaddr = server.accept() + client.setblocking(0) + reader.setblocking(0) + self.r = reader + self.w = client + self.fileno = self.r.fileno + + def wakeUp(self): + """Send a byte to my connection.""" + try: + util.untilConcludes(self.w.send, b"x") + except OSError as e: + if e.args[0] != errno.WSAEWOULDBLOCK: + raise + + def doRead(self): + """ + Read some data from my connection. + """ + try: + self.r.recv(8192) + except OSError: + pass + + def connectionLost(self, reason): + self.r.close() + self.w.close() + + +class _FDWaker(log.Logger): + """ + The I{self-pipe trick}, used to wake + up the main loop from another thread or a signal handler. + + L{_FDWaker} is a base class for waker implementations based on + writing to a pipe being monitored by the reactor. + + @ivar o: The file descriptor for the end of the pipe which can be + written to wake up a reactor monitoring this waker. + + @ivar i: The file descriptor which should be monitored in order to + be awoken by this waker. + """ + + disconnected = 0 + + i = None + o = None + + def __init__(self, reactor): + """Initialize.""" + self.reactor = reactor + self.i, self.o = os.pipe() + fdesc.setNonBlocking(self.i) + fdesc._setCloseOnExec(self.i) + fdesc.setNonBlocking(self.o) + fdesc._setCloseOnExec(self.o) + self.fileno = lambda: self.i + + def doRead(self): + """ + Read some bytes from the pipe and discard them. + """ + fdesc.readFromFD(self.fileno(), lambda data: None) + + def connectionLost(self, reason): + """Close both ends of my pipe.""" + if not hasattr(self, "o"): + return + for fd in self.i, self.o: + try: + os.close(fd) + except OSError: + pass + del self.i, self.o + + +@implementer(_IWaker) +class _UnixWaker(_FDWaker): + """ + This class provides a simple interface to wake up the event loop. + + This is used by threads or signals to wake up the event loop. + """ + + def wakeUp(self): + """Write one byte to the pipe, and flush it.""" + # We don't use fdesc.writeToFD since we need to distinguish + # between EINTR (try again) and EAGAIN (do nothing). + if self.o is not None: + try: + util.untilConcludes(os.write, self.o, b"x") + except OSError as e: + # XXX There is no unit test for raising the exception + # for other errnos. See #4285. + if e.errno != errno.EAGAIN: + raise + + +if platformType == "posix": + _Waker = _UnixWaker +else: + # Primarily Windows and Jython. + _Waker = _SocketWaker # type: ignore[misc,assignment] + + +class _SIGCHLDWaker(_FDWaker): + """ + L{_SIGCHLDWaker} can wake up a reactor whenever C{SIGCHLD} is + received. + """ + + def __init__(self, reactor): + _FDWaker.__init__(self, reactor) + + def install(self): + """ + Install the handler necessary to make this waker active. + """ + installHandler(self.o) + + def uninstall(self): + """ + Remove the handler which makes this waker active. + """ + installHandler(-1) + + def doRead(self): + """ + Having woken up the reactor in response to receipt of + C{SIGCHLD}, reap the process which exited. + + This is called whenever the reactor notices the waker pipe is + writeable, which happens soon after any call to the C{wakeUp} + method. + """ + _FDWaker.doRead(self) + process.reapAllProcesses() diff --git a/src/twisted/internet/cfreactor.py b/src/twisted/internet/cfreactor.py index 1fbf32b79db..9670570a4d6 100644 --- a/src/twisted/internet/cfreactor.py +++ b/src/twisted/internet/cfreactor.py @@ -44,15 +44,20 @@ ) from twisted.internet.interfaces import IReactorFDSet -from twisted.internet.posixbase import _NO_FILEDESC, PosixReactorBase, _Waker +from twisted.internet.posixbase import _NO_FILEDESC, PosixReactorBase from twisted.python import log +# We know that we're going to run on macOS so we can just pick the +# POSIX-appropriate waker. This also avoids having a dynamic base class and +# so lets more things get type checked. +from ._signals import _UnixWaker + _READ = 0 _WRITE = 1 _preserveSOError = 1 << 6 -class _WakerPlus(_Waker): +class _WakerPlus(_UnixWaker): """ The normal Twisted waker will simply wake up the main loop, which causes an iteration to run, which in turn causes L{ReactorBase.runUntilCurrent} @@ -74,7 +79,7 @@ def doRead(self): Wake up the loop and force C{runUntilCurrent} to run immediately in the next timed iteration. """ - result = _Waker.doRead(self) + result = super().doRead() self.reactor._scheduleSimulate(True) return result diff --git a/src/twisted/internet/posixbase.py b/src/twisted/internet/posixbase.py index 11561808cb4..8eb68666bdd 100644 --- a/src/twisted/internet/posixbase.py +++ b/src/twisted/internet/posixbase.py @@ -7,14 +7,11 @@ """ -import contextlib -import errno -import os import socket import sys from typing import Sequence -from zope.interface import Attribute, Interface, classImplements, implementer +from zope.interface import classImplements, implementer from twisted.internet import error, tcp, udp from twisted.internet.base import ReactorBase, _SignalReactorMixin @@ -31,8 +28,9 @@ IReactorUNIXDatagram, ) from twisted.internet.main import CONNECTION_DONE, CONNECTION_LOST -from twisted.python import failure, log, util +from twisted.python import failure, log from twisted.python.runtime import platform, platformType +from ._signals import _SIGCHLDWaker, _Waker # Exceptions that doSelect might return frequently _NO_FILENO = error.ConnectionFdescWentAway("Handler has no fileno method") @@ -57,7 +55,7 @@ processEnabled = False if unixEnabled: - from twisted.internet import _signals, fdesc, process, unix + from twisted.internet import process, unix processEnabled = True @@ -71,200 +69,6 @@ win32process = None -class _IWaker(Interface): - """ - Interface to wake up the event loop based on the self-pipe trick. - - The U{I{self-pipe trick}}, used to wake - up the main loop from another thread or a signal handler. - This is why we have wakeUp together with doRead - - This is used by threads or signals to wake up the event loop. - """ - - disconnected = Attribute("") - - def wakeUp(): - """ - Called when the event should be wake up. - """ - - def doRead(): - """ - Read some data from my connection and discard it. - """ - - def connectionLost(reason: failure.Failure): - """ - Called when connection was closed and the pipes. - """ - - -@implementer(_IWaker) -class _SocketWaker(log.Logger): - """ - The I{self-pipe trick}, implemented - using a pair of sockets rather than pipes (due to the lack of support in - select() on Windows for pipes), used to wake up the main loop from - another thread. - """ - - disconnected = 0 - - def __init__(self, reactor): - """Initialize.""" - self.reactor = reactor - # Following select_trigger (from asyncore)'s example; - client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - with contextlib.closing( - socket.socket(socket.AF_INET, socket.SOCK_STREAM) - ) as server: - server.bind(("127.0.0.1", 0)) - server.listen(1) - client.connect(server.getsockname()) - reader, clientaddr = server.accept() - client.setblocking(0) - reader.setblocking(0) - self.r = reader - self.w = client - self.fileno = self.r.fileno - - def wakeUp(self): - """Send a byte to my connection.""" - try: - util.untilConcludes(self.w.send, b"x") - except OSError as e: - if e.args[0] != errno.WSAEWOULDBLOCK: - raise - - def doRead(self): - """ - Read some data from my connection. - """ - try: - self.r.recv(8192) - except OSError: - pass - - def connectionLost(self, reason): - self.r.close() - self.w.close() - - -class _FDWaker(log.Logger): - """ - The I{self-pipe trick}, used to wake - up the main loop from another thread or a signal handler. - - L{_FDWaker} is a base class for waker implementations based on - writing to a pipe being monitored by the reactor. - - @ivar o: The file descriptor for the end of the pipe which can be - written to wake up a reactor monitoring this waker. - - @ivar i: The file descriptor which should be monitored in order to - be awoken by this waker. - """ - - disconnected = 0 - - i = None - o = None - - def __init__(self, reactor): - """Initialize.""" - self.reactor = reactor - self.i, self.o = os.pipe() - fdesc.setNonBlocking(self.i) - fdesc._setCloseOnExec(self.i) - fdesc.setNonBlocking(self.o) - fdesc._setCloseOnExec(self.o) - self.fileno = lambda: self.i - - def doRead(self): - """ - Read some bytes from the pipe and discard them. - """ - fdesc.readFromFD(self.fileno(), lambda data: None) - - def connectionLost(self, reason): - """Close both ends of my pipe.""" - if not hasattr(self, "o"): - return - for fd in self.i, self.o: - try: - os.close(fd) - except OSError: - pass - del self.i, self.o - - -@implementer(_IWaker) -class _UnixWaker(_FDWaker): - """ - This class provides a simple interface to wake up the event loop. - - This is used by threads or signals to wake up the event loop. - """ - - def wakeUp(self): - """Write one byte to the pipe, and flush it.""" - # We don't use fdesc.writeToFD since we need to distinguish - # between EINTR (try again) and EAGAIN (do nothing). - if self.o is not None: - try: - util.untilConcludes(os.write, self.o, b"x") - except OSError as e: - # XXX There is no unit test for raising the exception - # for other errnos. See #4285. - if e.errno != errno.EAGAIN: - raise - - -if platformType == "posix": - _Waker = _UnixWaker -else: - # Primarily Windows and Jython. - _Waker = _SocketWaker # type: ignore[misc,assignment] - - -class _SIGCHLDWaker(_FDWaker): - """ - L{_SIGCHLDWaker} can wake up a reactor whenever C{SIGCHLD} is - received. - - @see: L{twisted.internet._signals} - """ - - def __init__(self, reactor): - _FDWaker.__init__(self, reactor) - - def install(self): - """ - Install the handler necessary to make this waker active. - """ - _signals.installHandler(self.o) - - def uninstall(self): - """ - Remove the handler which makes this waker active. - """ - _signals.installHandler(-1) - - def doRead(self): - """ - Having woken up the reactor in response to receipt of - C{SIGCHLD}, reap the process which exited. - - This is called whenever the reactor notices the waker pipe is - writeable, which happens soon after any call to the C{wakeUp} - method. - """ - _FDWaker.doRead(self) - process.reapAllProcesses() - - class _DisconnectSelectableMixin: """ Mixin providing the C{_disconnectSelectable} method. diff --git a/src/twisted/newsfragments/11700.misc b/src/twisted/newsfragments/11700.misc new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/twisted/newsfragments/11702.misc b/src/twisted/newsfragments/11702.misc new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tox.ini b/tox.ini index 1be5cc7c3f6..3d97fd83d67 100644 --- a/tox.ini +++ b/tox.ini @@ -93,12 +93,12 @@ commands = posix: python -c "print('Running on POSIX (no special dependencies)')" ; Run tests without wrapping them using coverage. - nocov: python -m twisted.trial --temp-directory={envtmpdir}/_trial_temp --reporter={env:TRIAL_REPORTER:verbose} {env:TRIAL_ARGS:} {posargs:twisted} + nocov: python -m twisted.trial --temp-directory={envtmpdir}/_trial_temp --reporter={env:TRIAL_REPORTER:verbose} {env:TRIAL_ARGS:-j8} {posargs:twisted} ; Run the tests wrapped using coverage. withcov: python {toxinidir}/admin/_copy.py {toxinidir}/admin/zz_coverage.pth {envsitepackagesdir}/zz_coverage.pth withcov: coverage erase - withcov: coverage run -p --rcfile={toxinidir}/.coveragerc -m twisted.trial --temp-directory={envtmpdir}/_trial_temp --reporter={env:TRIAL_REPORTER:verbose} {env:TRIAL_ARGS:} {posargs:twisted} + withcov: coverage run -p --rcfile={toxinidir}/.coveragerc -m twisted.trial --temp-directory={envtmpdir}/_trial_temp --reporter={env:TRIAL_REPORTER:verbose} {env:TRIAL_ARGS:-j8} {posargs:twisted} lint: pre-commit {posargs:run --all-files --show-diff-on-failure}