Skip to content

Commit

Permalink
Always remove the right listener from the hub
Browse files Browse the repository at this point in the history
When in hubs.trampoline(fd, ...), a greenthread registers itself as a
listener for fd, switches to the hub, and then calls
hub.remove(listener) to deregister itself. hub.remove(listener)
removes the primary listener. If the greenthread awoke because its fd
became ready, then it is the primary listener, and everything is
fine. However, if the greenthread was a secondary listener and awoke
because a Timeout fired then it would remove the primary and promote a
random secondary to primary.

This commit makes hub.remove(listener) check to make sure listener is
the primary, and if it's not, remove the listener from the
secondaries.
  • Loading branch information
smerritt committed Jun 8, 2018
1 parent af407c7 commit a56a722
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 10 deletions.
26 changes: 17 additions & 9 deletions eventlet/hubs/hub.py
Expand Up @@ -224,15 +224,23 @@ def remove(self, listener):

fileno = listener.fileno
evtype = listener.evtype
self.listeners[evtype].pop(fileno, None)
# migrate a secondary listener to be the primary listener
if fileno in self.secondaries[evtype]:
sec = self.secondaries[evtype].get(fileno, None)
if not sec:
return
self.listeners[evtype][fileno] = sec.pop(0)
if not sec:
del self.secondaries[evtype][fileno]
if listener is self.listeners[evtype].get(fileno):
self.listeners[evtype].pop(fileno, None)
# migrate a secondary listener to be the primary listener
if fileno in self.secondaries[evtype]:
sec = self.secondaries[evtype].get(fileno, None)
if not sec:
return
self.listeners[evtype][fileno] = sec.pop(0)
if not sec:
del self.secondaries[evtype][fileno]
else:
sec = [l for l in self.secondaries[evtype].get(fileno, [])
if l is not listener]
if sec:
self.secondaries[evtype][fileno] = sec
else:
self.secondaries[evtype].pop(fileno, None)

def mark_as_reopened(self, fileno):
""" If a file descriptor is returned by the OS as the result of some
Expand Down
47 changes: 46 additions & 1 deletion tests/hub_test.py
@@ -1,12 +1,15 @@
from __future__ import with_statement
import errno
import fcntl
import os
import sys
import time

import tests
from tests import skip_with_pyevent, skip_if_no_itimer, skip_unless
from tests.patcher_test import ProcessBase
import eventlet
from eventlet import hubs
from eventlet import debug, hubs
from eventlet.support import greenlets
import six

Expand Down Expand Up @@ -84,6 +87,48 @@ def test_cancel_proportion(self):
eventlet.sleep()


class TestMultipleListenersCleanup(tests.LimitedTestCase):
def setUp(self):
super(TestMultipleListenersCleanup, self).setUp()
debug.hub_prevent_multiple_readers(False)
debug.hub_exceptions(False)

def tearDown(self):
super(TestMultipleListenersCleanup, self).tearDown()
debug.hub_prevent_multiple_readers(True)
debug.hub_exceptions(True)

def test_cleanup(self):
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)

fcntl.fcntl(r, fcntl.F_SETFL,
fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK)

def readfd(fd):
while True:
try:
return os.read(fd, 1)
except OSError as e:
if e.errno != errno.EAGAIN:
raise
hubs.trampoline(fd, read=True)

first_listener = eventlet.spawn(readfd, r)
eventlet.sleep()

second_listener = eventlet.spawn(readfd, r)
eventlet.sleep()

hubs.get_hub().schedule_call_global(0, second_listener.throw,
eventlet.Timeout(None))
eventlet.sleep()

os.write(w, b'.')
self.assertEqual(first_listener.wait(), b'.')


class TestScheduleCall(tests.LimitedTestCase):

def test_local(self):
Expand Down

0 comments on commit a56a722

Please sign in to comment.