Skip to content

Commit

Permalink
[IMP] core: support more than 255 http workers
Browse files Browse the repository at this point in the history
Use the most efficient Selector implementation available on the current platform

Odoo supports only SelectSelector but it is a little obsolete

python >= 3.4 supports a new high-level library Selectors:
 - https://docs.python.org/es/3/library/selectors.html

It could to auto-choose the following ones:
 - SelectSelector
 - PollSelector
 - EpollSelector
 - DevpollSelector
 - KqueueSelector

Using the DefaultSelector class the most efficient implementation available on the current platform will be use:
 - https://docs.python.org/3/library/selectors.html#selectors.DefaultSelector

It helps to support better the resources of the system

Using SelectSelector you are not able to run workers >=255

If you set `ulimit -n 10240` and run `odoo-bin --workers=255`
the following error is raised:

    Traceback (most recent call last):
    File "odoo/service/server.py", line 926, in run
        self.sleep()
    File "odoo/service/server.py", line 852, in sleep
        sel.select(self.beat)
    File "python3.8/lib/python3.8/selectors.py", line 323, in select
        r, w, _ = self._select(self._readers, self._writers, [], timeout)
    ValueError: filedescriptor out of range in select()

But using PollSelector it is not reproduced even using more workers

Most of platform supports PollSelector but using DefaultSelector we can be sure
that even too old system are supported too

And using this High-level library will allow to use the future new improvements

e.g. Epoll has better performance improvements

More info about:
 - https://devarea.com/linux-io-multiplexing-select-vs-poll-vs-epoll
 - redis/redis-py#486

Co-Authored-By: Julien Castiaux <juc@odoo.com>
  • Loading branch information
moylop260 and Julien00859 committed Aug 22, 2023
1 parent 92c9df5 commit ae19259
Showing 1 changed file with 47 additions and 40 deletions.
87 changes: 47 additions & 40 deletions odoo/service/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os.path
import platform
import random
import select
import selectors
import signal
import socket
import subprocess
Expand Down Expand Up @@ -435,7 +435,8 @@ def cron_thread(self, number):

from odoo.addons.base.models.ir_cron import ir_cron
conn = odoo.sql_db.db_connect('postgres')
with conn.cursor() as cr:
with conn.cursor() as cr, \
selectors.DefaultSelector() as sel:
pg_conn = cr._cnx
# LISTEN / NOTIFY doesn't work in recovery mode
cr.execute("SELECT pg_is_in_recovery()")
Expand All @@ -446,8 +447,9 @@ def cron_thread(self, number):
_logger.warning("PG cluster in recovery mode, cron trigger not activated")
cr.commit()

sel.register(pg_conn, selectors.EVENT_READ)
while True:
select.select([pg_conn], [], [], SLEEP_INTERVAL + number)
sel.select(timeout=SLEEP_INTERVAL + number)
time.sleep(number / 100)
pg_conn.poll()

Expand Down Expand Up @@ -780,8 +782,11 @@ def worker_spawn(self, klass, workers_registry):
worker.pid = pid
self.workers[pid] = worker
workers_registry[pid] = worker
self.selector.register(worker.watchdog_pipe[0], selectors.EVENT_READ)
return worker
else:
self.selector.close()
del self.selector
worker.run()
sys.exit(0)

Expand All @@ -800,6 +805,7 @@ def worker_pop(self, pid):
self.workers_http.pop(pid, None)
self.workers_cron.pop(pid, None)
worker = self.workers.pop(pid)
self.selector.unregister(worker.watchdog_pipe[0])
worker.close()
except OSError:
return
Expand Down Expand Up @@ -871,26 +877,24 @@ def process_spawn(self):
self.worker_spawn(WorkerCron, self.workers_cron)

def sleep(self):
try:
# map of fd -> worker
fds = {w.watchdog_pipe[0]: w for w in self.workers.values()}
fd_in = list(fds) + [self.pipe[0]]
# check for ping or internal wakeups
ready = select.select(fd_in, [], [], self.beat)
# map of fd -> worker
fds = {w.watchdog_pipe[0]: w for w in self.workers.values()}
# check for ping or internal wakeups
events = self.selector.select(timeout=self.beat)
# update worker watchdogs
for key, _mask in events:
# update worker watchdogs
for fd in ready[0]:
if fd in fds:
fds[fd].watchdog_time = time.time()
empty_pipe(fd)
except select.error as e:
if e.args[0] not in [errno.EINTR]:
raise
if key.fd in fds:
fds[key.fd].watchdog_time = time.time()
empty_pipe(key.fd)

def start(self):
# wakeup pipe, python doesn't throw EINTR when a syscall is interrupted
# by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
# signal handler to overcome this behaviour
self.pipe = self.pipe_new()
self.selector = selectors.DefaultSelector()
self.selector.register(self.pipe[0], selectors.EVENT_READ)
# set signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
Expand Down Expand Up @@ -938,6 +942,7 @@ def stop(self, graceful=True):
_logger.info("Stopping forcefully")
for pid in self.workers:
self.worker_kill(pid, signal.SIGTERM)
self.selector.close()

def run(self, preload, stop):
self.start()
Expand Down Expand Up @@ -976,6 +981,7 @@ def __init__(self, multi: PreforkServer):
self.watchdog_time = time.time()
self.watchdog_pipe = multi.pipe_new()
self.wakeup_fd_r, self.wakeup_fd_w = multi.pipe_new()
self.selector = None
# Can be set to None if no watchdog is desired.
self.watchdog_timeout = multi.timeout
self.ppid = os.getpid()
Expand Down Expand Up @@ -1005,13 +1011,7 @@ def signal_time_expired_handler(self, n, stack):
raise Exception('CPU time limit exceeded.')

def sleep(self):
try:
select.select([self.multi.socket, self.wakeup_fd_r], [], [], self.multi.beat)
# clear wakeup pipe if we were interrupted
empty_pipe(self.wakeup_fd_r)
except select.error as e:
if e.args[0] not in [errno.EINTR]:
raise
pass

def check_limits(self):
# If our parent changed suicide
Expand Down Expand Up @@ -1041,16 +1041,11 @@ def process_work(self):

def start(self):
self.pid = os.getpid()
self.selector = selectors.DefaultSelector()
self.setproctitle()
_logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
# Reseed the random number generator
random.seed()
if self.multi.socket:
# Prevent fd inheritance: close_on_exec
flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
# reset blocking status
self.multi.socket.setblocking(0)

signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGXCPU, self.signal_time_expired_handler)
Expand All @@ -1062,9 +1057,10 @@ def start(self):
signal.signal(signal.SIGTTOU, signal.SIG_DFL)

signal.set_wakeup_fd(self.wakeup_fd_w)
self.selector.register(self.wakeup_fd_r, selectors.EVENT_READ)

def stop(self):
pass
self.selector.close()

def run(self):
try:
Expand Down Expand Up @@ -1141,8 +1137,21 @@ def process_work(self):

def start(self):
super().start()

# Prevent fd inheritance: close_on_exec
flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
# reset blocking status
self.multi.socket.setblocking(0)
self.selector.register(self.multi.socket, selectors.EVENT_READ)

self.server = BaseWSGIServerNoBind(self.multi.app)

def sleep(self):
self.selector.select(timeout=self.multi.beat)
# clear wakeup pipe if we were interrupted
empty_pipe(self.wakeup_fd_r)

class WorkerCron(Worker):
""" Cron workers """

Expand All @@ -1161,15 +1170,11 @@ def sleep(self):
interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect

# simulate interruptible sleep with select(wakeup_fd, timeout)
try:
select.select([self.wakeup_fd_r, self.dbcursor._cnx], [], [], interval)
# clear pg_conn/wakeup pipe if we were interrupted
time.sleep(self.pid / 100 % .1)
self.dbcursor._cnx.poll()
empty_pipe(self.wakeup_fd_r)
except select.error as e:
if e.args[0] != errno.EINTR:
raise
self.selector.select(interval)
# clear pg_conn/wakeup pipe if we were interrupted
time.sleep(self.pid / 100 % .1) # prevent thundering herd
self.dbcursor._cnx.poll()
empty_pipe(self.wakeup_fd_r)

def _db_list(self):
if config['db_name']:
Expand Down Expand Up @@ -1214,13 +1219,15 @@ def start(self):
in_recovery = self.dbcursor.fetchone()[0]
if not in_recovery:
self.dbcursor.execute("LISTEN cron_trigger")
self.selector.register(self.dbcursor._cnx, selectors.EVENT_READ)
else:
_logger.warning("PG cluster in recovery mode, cron trigger not activated")
self.dbcursor.commit()

def stop(self):
if self.dbcursor:
self.dbcursor.close()
super().stop()
self.dbcursor.close()

#----------------------------------------------------------
# start/stop public api
Expand Down

0 comments on commit ae19259

Please sign in to comment.