From 0443a1f3cd54f219056ada3417a71e35f7d15029 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Thu, 7 Jun 2018 20:45:09 -0700 Subject: [PATCH] Always remove the right listener from the hub When in hubs.trampoline(fd, ...), a greenthread registers itself as a listener for fd, switches to the hub, and then calls hub.remove(listener) to deregister itself. hub.remove(listener) removes the primary listener. If the greenthread awoke because its fd became ready, then it is the primary listener, and everything is fine. However, if the greenthread was a secondary listener and awoke because a Timeout fired then it would remove the primary and promote a random secondary to primary. This commit makes hub.remove(listener) check to make sure listener is the primary, and if it's not, remove the listener from the secondaries. --- eventlet/hubs/hub.py | 26 +++++++++++++++--------- tests/hub_test.py | 47 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 63 insertions(+), 10 deletions(-) diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index 8871082edd..7c356f2368 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -225,15 +225,23 @@ def remove(self, listener): fileno = listener.fileno evtype = listener.evtype - self.listeners[evtype].pop(fileno, None) - # migrate a secondary listener to be the primary listener - if fileno in self.secondaries[evtype]: - sec = self.secondaries[evtype].get(fileno, None) - if not sec: - return - self.listeners[evtype][fileno] = sec.pop(0) - if not sec: - del self.secondaries[evtype][fileno] + if listener is self.listeners[evtype].get(fileno): + self.listeners[evtype].pop(fileno, None) + # migrate a secondary listener to be the primary listener + if fileno in self.secondaries[evtype]: + sec = self.secondaries[evtype].get(fileno, None) + if not sec: + return + self.listeners[evtype][fileno] = sec.pop(0) + if not sec: + del self.secondaries[evtype][fileno] + else: + sec = [l for l in self.secondaries[evtype].get(fileno, []) + if l is not listener] + if sec: + self.secondaries[evtype][fileno] = sec + else: + self.secondaries[evtype].pop(fileno, None) def mark_as_reopened(self, fileno): """ If a file descriptor is returned by the OS as the result of some diff --git a/tests/hub_test.py b/tests/hub_test.py index d62b8059cd..8b9bbe9427 100644 --- a/tests/hub_test.py +++ b/tests/hub_test.py @@ -1,11 +1,14 @@ from __future__ import with_statement +import errno +import fcntl +import os import sys import time import tests from tests import skip_with_pyevent, skip_if_no_itimer, skip_unless import eventlet -from eventlet import hubs +from eventlet import debug, hubs from eventlet.support import greenlets import six @@ -83,6 +86,48 @@ def test_cancel_proportion(self): eventlet.sleep() +class TestMultipleListenersCleanup(tests.LimitedTestCase): + def setUp(self): + super(TestMultipleListenersCleanup, self).setUp() + debug.hub_prevent_multiple_readers(False) + debug.hub_exceptions(False) + + def tearDown(self): + super(TestMultipleListenersCleanup, self).tearDown() + debug.hub_prevent_multiple_readers(True) + debug.hub_exceptions(True) + + def test_cleanup(self): + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + + fcntl.fcntl(r, fcntl.F_SETFL, + fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK) + + def readfd(fd): + while True: + try: + return os.read(fd, 1) + except OSError as e: + if e.errno != errno.EAGAIN: + raise + hubs.trampoline(fd, read=True) + + first_listener = eventlet.spawn(readfd, r) + eventlet.sleep() + + second_listener = eventlet.spawn(readfd, r) + eventlet.sleep() + + hubs.get_hub().schedule_call_global(0, second_listener.throw, + eventlet.Timeout(None)) + eventlet.sleep() + + os.write(w, b'.') + self.assertEqual(first_listener.wait(), b'.') + + class TestScheduleCall(tests.LimitedTestCase): def test_local(self):