diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index 112f4674a7..70f124371b 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -224,15 +224,23 @@ def remove(self, listener): fileno = listener.fileno evtype = listener.evtype - self.listeners[evtype].pop(fileno, None) - # migrate a secondary listener to be the primary listener - if fileno in self.secondaries[evtype]: - sec = self.secondaries[evtype].get(fileno, None) - if not sec: - return - self.listeners[evtype][fileno] = sec.pop(0) - if not sec: - del self.secondaries[evtype][fileno] + if listener is self.listeners[evtype].get(fileno): + self.listeners[evtype].pop(fileno, None) + # migrate a secondary listener to be the primary listener + if fileno in self.secondaries[evtype]: + sec = self.secondaries[evtype].get(fileno, None) + if not sec: + return + self.listeners[evtype][fileno] = sec.pop(0) + if not sec: + del self.secondaries[evtype][fileno] + else: + sec = [l for l in self.secondaries[evtype].get(fileno, []) + if l is not listener] + if sec: + self.secondaries[evtype][fileno] = sec + else: + self.secondaries[evtype].pop(fileno, None) def mark_as_reopened(self, fileno): """ If a file descriptor is returned by the OS as the result of some diff --git a/tests/hub_test.py b/tests/hub_test.py index 61b5b0b977..d1a003e414 100644 --- a/tests/hub_test.py +++ b/tests/hub_test.py @@ -1,4 +1,7 @@ from __future__ import with_statement +import errno +import fcntl +import os import sys import time @@ -6,7 +9,7 @@ from tests import skip_with_pyevent, skip_if_no_itimer, skip_unless from tests.patcher_test import ProcessBase import eventlet -from eventlet import hubs +from eventlet import debug, hubs from eventlet.support import greenlets import six @@ -84,6 +87,48 @@ def test_cancel_proportion(self): eventlet.sleep() +class TestMultipleListenersCleanup(tests.LimitedTestCase): + def setUp(self): + super(TestMultipleListenersCleanup, self).setUp() + debug.hub_prevent_multiple_readers(False) + debug.hub_exceptions(False) + + def tearDown(self): + super(TestMultipleListenersCleanup, self).tearDown() + debug.hub_prevent_multiple_readers(True) + debug.hub_exceptions(True) + + def test_cleanup(self): + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + + fcntl.fcntl(r, fcntl.F_SETFL, + fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK) + + def readfd(fd): + while True: + try: + return os.read(fd, 1) + except OSError as e: + if e.errno != errno.EAGAIN: + raise + hubs.trampoline(fd, read=True) + + first_listener = eventlet.spawn(readfd, r) + eventlet.sleep() + + second_listener = eventlet.spawn(readfd, r) + eventlet.sleep() + + hubs.get_hub().schedule_call_global(0, second_listener.throw, + eventlet.Timeout(None)) + eventlet.sleep() + + os.write(w, b'.') + self.assertEqual(first_listener.wait(), b'.') + + class TestScheduleCall(tests.LimitedTestCase): def test_local(self):