Skip to content

Commit

Permalink
Merge pull request #1598 from twisted/10198-mypy-test_core
Browse files Browse the repository at this point in the history
Author: wsanchez
Reviewer: adiroiban, grangert
Fixes: ticket:1598

Add type hints to twisted.internet.test.test_core.
  • Loading branch information
wsanchez committed Nov 17, 2021
2 parents 3d0be31 + 68691a9 commit a8aae68
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 71 deletions.
4 changes: 0 additions & 4 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,6 @@ allow_incomplete_defs = True
allow_untyped_defs = True
check_untyped_defs = False

[mypy-twisted.internet.test.test_core]
allow_untyped_defs = True
check_untyped_defs = False

[mypy-twisted.internet.test.test_default]
allow_untyped_defs = True
check_untyped_defs = False
Expand Down
153 changes: 86 additions & 67 deletions src/twisted/internet/test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,23 @@
import inspect
import signal
import time
from types import FrameType
from typing import Callable, List, Optional, Tuple, Union, cast

from twisted.internet.abstract import FileDescriptor
from twisted.internet.defer import Deferred
from twisted.internet.error import ReactorAlreadyRunning, ReactorNotRestartable
from twisted.internet.test.reactormixins import ReactorBuilder
from twisted.python.failure import Failure
from twisted.trial.unittest import SynchronousTestCase


class ObjectModelIntegrationMixin:
"""
Helpers for tests about the object model of reactor-related objects.
"""

def assertFullyNewStyle(self, instance):
def assertFullyNewStyle(self, instance: object) -> None:
"""
Assert that the given object is an instance of a new-style class and
that there are no classic classes in the inheritance hierarchy of
Expand All @@ -30,10 +34,11 @@ def assertFullyNewStyle(self, instance):
This is a beneficial condition because PyPy is better able to
optimize attribute lookup on such classes.
"""
self.assertIsInstance(instance, object)
testCase = cast(SynchronousTestCase, self)
testCase.assertIsInstance(instance, object)
mro = inspect.getmro(type(instance))
for subclass in mro:
self.assertTrue(
testCase.assertTrue(
issubclass(subclass, object), f"{subclass!r} is not new-style"
)

Expand All @@ -43,7 +48,7 @@ class ObjectModelIntegrationTests(ReactorBuilder, ObjectModelIntegrationMixin):
Test details of object model integration against all reactors.
"""

def test_newstyleReactor(self):
def test_newstyleReactor(self) -> None:
"""
Checks that all reactors on a platform have method resolution order
containing only new style classes.
Expand All @@ -58,38 +63,38 @@ class SystemEventTestsBuilder(ReactorBuilder):
and L{IReactorCore.fireSystemEvent}.
"""

def test_stopWhenNotStarted(self):
def test_stopWhenNotStarted(self) -> None:
"""
C{reactor.stop()} raises L{RuntimeError} when called when the reactor
has not been started.
"""
reactor = self.buildReactor()
self.assertRaises(RuntimeError, reactor.stop)
cast(SynchronousTestCase, self).assertRaises(RuntimeError, reactor.stop)

def test_stopWhenAlreadyStopped(self):
def test_stopWhenAlreadyStopped(self) -> None:
"""
C{reactor.stop()} raises L{RuntimeError} when called after the reactor
has been stopped.
"""
reactor = self.buildReactor()
reactor.callWhenRunning(reactor.stop)
self.runReactor(reactor)
self.assertRaises(RuntimeError, reactor.stop)
cast(SynchronousTestCase, self).assertRaises(RuntimeError, reactor.stop)

def test_callWhenRunningOrder(self):
def test_callWhenRunningOrder(self) -> None:
"""
Functions are run in the order that they were passed to
L{reactor.callWhenRunning}.
"""
reactor = self.buildReactor()
events = []
events: List[str] = []
reactor.callWhenRunning(events.append, "first")
reactor.callWhenRunning(events.append, "second")
reactor.callWhenRunning(reactor.stop)
self.runReactor(reactor)
self.assertEqual(events, ["first", "second"])
cast(SynchronousTestCase, self).assertEqual(events, ["first", "second"])

def test_runningForStartupEvents(self):
def test_runningForStartupEvents(self) -> None:
"""
The reactor is not running when C{"before"} C{"startup"} triggers are
called and is running when C{"during"} and C{"after"} C{"startup"}
Expand All @@ -98,54 +103,60 @@ def test_runningForStartupEvents(self):
reactor = self.buildReactor()
state = {}

def beforeStartup():
def beforeStartup() -> None:
state["before"] = reactor.running

def duringStartup():
def duringStartup() -> None:
state["during"] = reactor.running

def afterStartup():
def afterStartup() -> None:
state["after"] = reactor.running

testCase = cast(SynchronousTestCase, self)

reactor.addSystemEventTrigger("before", "startup", beforeStartup)
reactor.addSystemEventTrigger("during", "startup", duringStartup)
reactor.addSystemEventTrigger("after", "startup", afterStartup)
reactor.callWhenRunning(reactor.stop)
self.assertEqual(state, {})
testCase.assertEqual(state, {})
self.runReactor(reactor)
self.assertEqual(state, {"before": False, "during": True, "after": True})
testCase.assertEqual(state, {"before": False, "during": True, "after": True})

def test_signalHandlersInstalledDuringStartup(self):
def test_signalHandlersInstalledDuringStartup(self) -> None:
"""
Signal handlers are installed in responsed to the C{"during"}
C{"startup"}.
"""
reactor = self.buildReactor()
phase = [None]
phase: Optional[str] = None

def beforeStartup():
phase[0] = "before"
def beforeStartup() -> None:
nonlocal phase
phase = "before"

def afterStartup():
phase[0] = "after"
def afterStartup() -> None:
nonlocal phase
phase = "after"

reactor.addSystemEventTrigger("before", "startup", beforeStartup)
reactor.addSystemEventTrigger("after", "startup", afterStartup)

sawPhase = []

def fakeSignal(signum, action):
sawPhase.append(phase[0])
def fakeSignal(signum: int, action: Callable[[int, FrameType], None]) -> None:
sawPhase.append(phase)

testCase = cast(SynchronousTestCase, self)

self.patch(signal, "signal", fakeSignal)
testCase.patch(signal, "signal", fakeSignal)
reactor.callWhenRunning(reactor.stop)
self.assertIsNone(phase[0])
self.assertEqual(sawPhase, [])
testCase.assertIsNone(phase)
testCase.assertEqual(sawPhase, [])
self.runReactor(reactor)
self.assertIn("before", sawPhase)
self.assertEqual(phase[0], "after")
testCase.assertIn("before", sawPhase)
testCase.assertEqual(phase, "after")

def test_stopShutDownEvents(self):
def test_stopShutDownEvents(self) -> None:
"""
C{reactor.stop()} fires all three phases of shutdown event triggers
before it makes C{reactor.run()} return.
Expand All @@ -163,32 +174,34 @@ def test_stopShutDownEvents(self):
)
reactor.callWhenRunning(reactor.stop)
self.runReactor(reactor)
self.assertEqual(
cast(SynchronousTestCase, self).assertEqual(
events,
[("before", "shutdown"), ("during", "shutdown"), ("after", "shutdown")],
)

def test_shutdownFiresTriggersAsynchronously(self):
def test_shutdownFiresTriggersAsynchronously(self) -> None:
"""
C{"before"} C{"shutdown"} triggers are not run synchronously from
L{reactor.stop}.
"""
reactor = self.buildReactor()
events = []
events: List[str] = []
reactor.addSystemEventTrigger(
"before", "shutdown", events.append, "before shutdown"
)

def stopIt():
def stopIt() -> None:
reactor.stop()
events.append("stopped")

testCase = cast(SynchronousTestCase, self)

reactor.callWhenRunning(stopIt)
self.assertEqual(events, [])
testCase.assertEqual(events, [])
self.runReactor(reactor)
self.assertEqual(events, ["stopped", "before shutdown"])
testCase.assertEqual(events, ["stopped", "before shutdown"])

def test_shutdownDisconnectsCleanly(self):
def test_shutdownDisconnectsCleanly(self) -> None:
"""
A L{IFileDescriptor.connectionLost} implementation which raises an
exception does not prevent the remaining L{IFileDescriptor}s from
Expand All @@ -198,13 +211,14 @@ def test_shutdownDisconnectsCleanly(self):

# Subclass FileDescriptor to get logPrefix
class ProblematicFileDescriptor(FileDescriptor):
def connectionLost(self, reason):
def connectionLost(self, reason: Failure) -> None:
raise RuntimeError("simulated connectionLost error")

class OKFileDescriptor(FileDescriptor):
def connectionLost(self, reason):
def connectionLost(self, reason: Failure) -> None:
lostOK[0] = True

testCase = cast(SynchronousTestCase, self)
reactor = self.buildReactor()

# Unfortunately, it is necessary to patch removeAll to directly control
Expand All @@ -216,51 +230,53 @@ def connectionLost(self, reason):
reactor.removeAll = lambda: fds
reactor.callWhenRunning(reactor.stop)
self.runReactor(reactor)
self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1)
self.assertTrue(lostOK[0])
testCase.assertEqual(len(testCase.flushLoggedErrors(RuntimeError)), 1)
testCase.assertTrue(lostOK[0])

def test_multipleRun(self):
def test_multipleRun(self) -> None:
"""
C{reactor.run()} raises L{ReactorAlreadyRunning} when called when
the reactor is already running.
"""
events = []
events: List[str] = []

testCase = cast(SynchronousTestCase, self)

def reentrantRun():
self.assertRaises(ReactorAlreadyRunning, reactor.run)
def reentrantRun() -> None:
testCase.assertRaises(ReactorAlreadyRunning, reactor.run)
events.append("tested")

reactor = self.buildReactor()
reactor.callWhenRunning(reentrantRun)
reactor.callWhenRunning(reactor.stop)
self.runReactor(reactor)
self.assertEqual(events, ["tested"])
testCase.assertEqual(events, ["tested"])

def test_runWithAsynchronousBeforeStartupTrigger(self):
def test_runWithAsynchronousBeforeStartupTrigger(self) -> None:
"""
When there is a C{'before'} C{'startup'} trigger which returns an
unfired L{Deferred}, C{reactor.run()} starts the reactor and does not
return until after C{reactor.stop()} is called
"""
events = []

def trigger():
def trigger() -> Deferred[object]:
events.append("trigger")
d = Deferred()
d: Deferred[object] = Deferred()
d.addCallback(callback)
reactor.callLater(0, d.callback, None)
return d

def callback(ignored):
def callback(ignored: object) -> None:
events.append("callback")
reactor.stop()

reactor = self.buildReactor()
reactor.addSystemEventTrigger("before", "startup", trigger)
self.runReactor(reactor)
self.assertEqual(events, ["trigger", "callback"])
cast(SynchronousTestCase, self).assertEqual(events, ["trigger", "callback"])

def test_iterate(self):
def test_iterate(self) -> None:
"""
C{reactor.iterate()} does not block.
"""
Expand All @@ -271,10 +287,10 @@ def test_iterate(self):
reactor.iterate(0) # Shouldn't block
elapsed = time.time() - start

self.assertTrue(elapsed < 2)
cast(SynchronousTestCase, self).assertTrue(elapsed < 2)
t.cancel()

def test_crash(self):
def test_crash(self) -> None:
"""
C{reactor.crash()} stops the reactor and does not fire shutdown
triggers.
Expand All @@ -286,50 +302,53 @@ def test_crash(self):
)
reactor.callWhenRunning(reactor.callLater, 0, reactor.crash)
self.runReactor(reactor)
self.assertFalse(reactor.running)
self.assertFalse(
testCase = cast(SynchronousTestCase, self)
testCase.assertFalse(reactor.running)
testCase.assertFalse(
events, "Shutdown triggers invoked but they should not have been."
)

def test_runAfterCrash(self):
def test_runAfterCrash(self) -> None:
"""
C{reactor.run()} restarts the reactor after it has been stopped by
C{reactor.crash()}.
"""
events = []
events: List[Union[str, Tuple[str, bool]]] = []

def crash():
def crash() -> None:
events.append("crash")
reactor.crash()

reactor = self.buildReactor()
reactor.callWhenRunning(crash)
self.runReactor(reactor)

def stop():
def stop() -> None:
events.append(("stop", reactor.running))
reactor.stop()

reactor.callWhenRunning(stop)
self.runReactor(reactor)
self.assertEqual(events, ["crash", ("stop", True)])
cast(SynchronousTestCase, self).assertEqual(events, ["crash", ("stop", True)])

def test_runAfterStop(self):
def test_runAfterStop(self) -> None:
"""
C{reactor.run()} raises L{ReactorNotRestartable} when called when
the reactor is being run after getting stopped priorly.
"""
events = []
events: List[str] = []

testCase = cast(SynchronousTestCase, self)

def restart():
self.assertRaises(ReactorNotRestartable, reactor.run)
def restart() -> None:
testCase.assertRaises(ReactorNotRestartable, reactor.run)
events.append("tested")

reactor = self.buildReactor()
reactor.callWhenRunning(reactor.stop)
reactor.addSystemEventTrigger("after", "shutdown", restart)
self.runReactor(reactor)
self.assertEqual(events, ["tested"])
testCase.assertEqual(events, ["tested"])


globals().update(SystemEventTestsBuilder.makeTestCaseClasses())
Expand Down
1 change: 1 addition & 0 deletions src/twisted/newsfragments/10198.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add type hints to twisted.internet.test.test_core

0 comments on commit a8aae68

Please sign in to comment.