Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atfork #68

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions eventlet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
__version__ = ".".join(map(str, version_info))

try:
from eventlet import atfork
from eventlet import greenthread
from eventlet import greenpool
from eventlet import queue
Expand Down
18 changes: 18 additions & 0 deletions eventlet/atfork.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os


_child_callbacks = []


def register_child_callback(callback):
_child_callbacks.append(callback)


__original_fork__ = os.fork
def _patched_fork():
pid = __original_fork__()
if pid == 0:
for callback in _child_callbacks:
callback()
return pid
os.fork = _patched_fork
7 changes: 7 additions & 0 deletions eventlet/hubs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import os
import eventlet
from eventlet.support import greenlets as greenlet
from eventlet import patcher

Expand Down Expand Up @@ -158,3 +159,9 @@ def trampoline(fd, read=None, write=None, timeout=None,
finally:
if t is not None:
t.cancel()


def _del_hub():
if hasattr(_threadlocal, 'hub'):
del _threadlocal.hub
eventlet.atfork.register_child_callback(_del_hub)
16 changes: 5 additions & 11 deletions eventlet/hubs/epolls.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import errno
from eventlet.support import get_errno
from eventlet import patcher
time = patcher.original('time')
select = patcher.original("select")
Expand Down Expand Up @@ -46,15 +44,11 @@ def add(self, evtype, fileno, cb):
oldlisteners = bool(self.listeners[READ].get(fileno) or
self.listeners[WRITE].get(fileno))
listener = BaseHub.add(self, evtype, fileno, cb)
try:
if not oldlisteners:
# Means we've added a new listener
self.register(fileno, new=True)
else:
self.register(fileno, new=False)
except IOError, ex: # ignore EEXIST, #80
if get_errno(ex) != errno.EEXIST:
raise
if not oldlisteners:
# Means we've added a new listener
self.register(fileno, new=True)
else:
self.register(fileno, new=False)
return listener

def do_poll(self, seconds):
Expand Down
31 changes: 4 additions & 27 deletions eventlet/hubs/kqueue.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import os
import sys
from eventlet import patcher
select = patcher.original('select')
time = patcher.original('time')
sleep = time.sleep

from eventlet.support import get_errno, clear_sys_exc_info
from eventlet.support import clear_sys_exc_info
from eventlet.hubs.hub import BaseHub, READ, WRITE, noop


Expand All @@ -23,29 +22,7 @@ class Hub(BaseHub):
def __init__(self, clock=time.time):
super(Hub, self).__init__(clock)
self._events = {}
self._init_kqueue()

def _init_kqueue(self):
self.kqueue = select.kqueue()
self._pid = os.getpid()

def _reinit_kqueue(self):
self.kqueue.close()
self._init_kqueue()
kqueue = self.kqueue
events = [e for i in self._events.itervalues()
for e in i.itervalues()]
kqueue.control(events, 0, 0)

def _control(self, events, max_events, timeout):
try:
return self.kqueue.control(events, max_events, timeout)
except OSError:
# have we forked?
if os.getpid() != self._pid:
self._reinit_kqueue()
return self.kqueue.control(events, max_events, timeout)
raise

def add(self, evtype, fileno, cb):
listener = super(Hub, self).add(evtype, fileno, cb)
Expand All @@ -54,7 +31,7 @@ def add(self, evtype, fileno, cb):
try:
event = select.kevent(fileno,
FILTERS.get(evtype), select.KQ_EV_ADD)
self._control([event], 0, 0)
self.kqueue.control([event], 0, 0)
events[evtype] = event
except ValueError:
super(Hub, self).remove(listener)
Expand All @@ -64,7 +41,7 @@ def add(self, evtype, fileno, cb):
def _delete_events(self, events):
del_events = map(lambda e: select.kevent(e.ident, e.filter,
select.KQ_EV_DELETE), events)
self._control(del_events, 0, 0)
self.kqueue.control(del_events, 0, 0)

def remove(self, listener):
super(Hub, self).remove(listener)
Expand Down Expand Up @@ -95,7 +72,7 @@ def wait(self, seconds=None):
if seconds:
sleep(seconds)
return
result = self._control([], self.MAX_EVENTS, seconds)
result = self.kqueue.control([], self.MAX_EVENTS, seconds)
SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS
for event in result:
fileno = event.ident
Expand Down
35 changes: 14 additions & 21 deletions tests/hub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,37 +295,30 @@ def test_repeated_selects(self):

class TestFork(ProcessBase):

@skip_with_pyevent
def test_fork(self):
new_mod = """
import os
import eventlet
import eventlet.hubs

eventlet.hubs.get_hub() # create the hub *before* forking
server = eventlet.listen(('localhost', 12345))
t = eventlet.Timeout(0.01)
os.fork()
try:
eventlet.Timeout(0.5)
new_sock, address = server.accept()
except eventlet.Timeout, t:
pass

pid = os.fork()
if not pid:
t = eventlet.Timeout(0.1)
try:
new_sock, address = server.accept()
except eventlet.Timeout, t:
print "accept blocked"

else:
kpid, status = os.wait()
assert kpid == pid
assert status == 0
print "child died ok"
except eventlet.Timeout:
print 'timeout'
except IOError:
print 'ioerror'
"""
self.write_to_tempfile("newmod", new_mod)
output, lines = self.launch_subprocess('newmod.py')
self.assertEqual(len(lines), 3, output)
self.assert_("accept blocked" in lines[0])
self.assert_("child died ok" in lines[1])
# after the fork, the parent and child processes will try to
# register the same file descriptor when they accept(). if an epoll hub
# isn't reset in the child, so that it has its own distinct instance of
# epoll in the kernel, there will be a conflict and it'll throw IOError
self.assertEqual(output, 'timeout\ntimeout\n')


class TestDeadRunLoop(LimitedTestCase):
Expand Down