From a56a72284233ecec08ee1506cc26d6f01d525fe0 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Thu, 7 Jun 2018 20:45:09 -0700 Subject: [PATCH] Always remove the right listener from the hub When in hubs.trampoline(fd, ...), a greenthread registers itself as a listener for fd, switches to the hub, and then calls hub.remove(listener) to deregister itself. hub.remove(listener) removes the primary listener. If the greenthread awoke because its fd became ready, then it is the primary listener, and everything is fine. However, if the greenthread was a secondary listener and awoke because a Timeout fired then it would remove the primary and promote a random secondary to primary. This commit makes hub.remove(listener) check to make sure listener is the primary, and if it's not, remove the listener from the secondaries. --- eventlet/hubs/hub.py | 26 +++++++++++++++--------- tests/hub_test.py | 47 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 63 insertions(+), 10 deletions(-) diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index 112f4674a7..70f124371b 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -224,15 +224,23 @@ def remove(self, listener): fileno = listener.fileno evtype = listener.evtype - self.listeners[evtype].pop(fileno, None) - # migrate a secondary listener to be the primary listener - if fileno in self.secondaries[evtype]: - sec = self.secondaries[evtype].get(fileno, None) - if not sec: - return - self.listeners[evtype][fileno] = sec.pop(0) - if not sec: - del self.secondaries[evtype][fileno] + if listener is self.listeners[evtype].get(fileno): + self.listeners[evtype].pop(fileno, None) + # migrate a secondary listener to be the primary listener + if fileno in self.secondaries[evtype]: + sec = self.secondaries[evtype].get(fileno, None) + if not sec: + return + self.listeners[evtype][fileno] = sec.pop(0) + if not sec: + del self.secondaries[evtype][fileno] + else: + sec = [l for l in self.secondaries[evtype].get(fileno, []) + if l is not listener] + if sec: + self.secondaries[evtype][fileno] = sec + else: + self.secondaries[evtype].pop(fileno, None) def mark_as_reopened(self, fileno): """ If a file descriptor is returned by the OS as the result of some diff --git a/tests/hub_test.py b/tests/hub_test.py index 61b5b0b977..d1a003e414 100644 --- a/tests/hub_test.py +++ b/tests/hub_test.py @@ -1,4 +1,7 @@ from __future__ import with_statement +import errno +import fcntl +import os import sys import time @@ -6,7 +9,7 @@ from tests import skip_with_pyevent, skip_if_no_itimer, skip_unless from tests.patcher_test import ProcessBase import eventlet -from eventlet import hubs +from eventlet import debug, hubs from eventlet.support import greenlets import six @@ -84,6 +87,48 @@ def test_cancel_proportion(self): eventlet.sleep() +class TestMultipleListenersCleanup(tests.LimitedTestCase): + def setUp(self): + super(TestMultipleListenersCleanup, self).setUp() + debug.hub_prevent_multiple_readers(False) + debug.hub_exceptions(False) + + def tearDown(self): + super(TestMultipleListenersCleanup, self).tearDown() + debug.hub_prevent_multiple_readers(True) + debug.hub_exceptions(True) + + def test_cleanup(self): + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + + fcntl.fcntl(r, fcntl.F_SETFL, + fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK) + + def readfd(fd): + while True: + try: + return os.read(fd, 1) + except OSError as e: + if e.errno != errno.EAGAIN: + raise + hubs.trampoline(fd, read=True) + + first_listener = eventlet.spawn(readfd, r) + eventlet.sleep() + + second_listener = eventlet.spawn(readfd, r) + eventlet.sleep() + + hubs.get_hub().schedule_call_global(0, second_listener.throw, + eventlet.Timeout(None)) + eventlet.sleep() + + os.write(w, b'.') + self.assertEqual(first_listener.wait(), b'.') + + class TestScheduleCall(tests.LimitedTestCase): def test_local(self):