Skip to content

Commit

Permalink
[FIX] server, bus: filedescriptor out of range in select
Browse files Browse the repository at this point in the history
Use the most efficient Selector implementation available on the current platform

Odoo supports only SelectSelector but it is a little obsolete

python >= 3.4 supports a new high-level library Selectors:
 - https://docs.python.org/es/3/library/selectors.html

It could to auto-choose the following ones:
 - SelectSelector
 - PollSelector
 - EpollSelector
 - DevpollSelector
 - KqueueSelector

Using the DefaultSelector class the most efficient implementation available on the current platform will be use:
 - https://docs.python.org/3/library/selectors.html#selectors.DefaultSelector

It helps to support better the resources of the system

Using SelectSelector you are not able to run workers >=255

If you set `ulimit -n 10240` and run `odoo-bin --workers=255`
the following error is raised:

    Traceback (most recent call last):
    File "odoo/service/server.py", line 926, in run
        self.sleep()
    File "odoo/service/server.py", line 852, in sleep
        sel.select(self.beat)
    File "python3.8/lib/python3.8/selectors.py", line 323, in select
        r, w, _ = self._select(self._readers, self._writers, [], timeout)
    ValueError: filedescriptor out of range in select()

But using PollSelector it is not reproduced even using more workers

Most of platform supports PollSelector but using DefaultSelector we can be sure
that even too old system are supported too

And using this High-level library will allow to use the future new improvements

e.g. Epoll has better performance improvements

More info about:
 - https://devarea.com/linux-io-multiplexing-select-vs-poll-vs-epoll
 - redis/redis-py#486
  • Loading branch information
moylop260 committed Feb 21, 2022
1 parent 69e31db commit 19982a2
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 31 deletions.
9 changes: 6 additions & 3 deletions addons/bus/models/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
import random
import select
import selectors
import threading
import time

Expand All @@ -15,6 +15,8 @@

_logger = logging.getLogger(__name__)

select = selectors.DefaultSelector

# longpolling timeout connection
TIMEOUT = 50

Expand Down Expand Up @@ -167,12 +169,13 @@ def poll(self, dbname, channels, last, options=None, timeout=None):
def loop(self):
""" Dispatch postgres notifications to the relevant polling threads/greenlets """
_logger.info("Bus.loop listen imbus on db postgres")
with odoo.sql_db.db_connect('postgres').cursor() as cr:
with odoo.sql_db.db_connect('postgres').cursor() as cr, select() as sel:
conn = cr._cnx
cr.execute("listen imbus")
cr.commit();
sel.register(conn, selectors.EVENT_READ)
while True:
if select.select([conn], [], [], TIMEOUT) == ([], [], []):
if not sel.select(timeout=TIMEOUT):
pass
else:
conn.poll()
Expand Down
65 changes: 37 additions & 28 deletions odoo/service/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import os.path
import platform
import random
import select
import selectors
import signal
import socket
import subprocess
Expand Down Expand Up @@ -62,6 +62,8 @@

_logger = logging.getLogger(__name__)

select = selectors.DefaultSelector

SLEEP_INTERVAL = 60 # 1 min

def memory_info(process):
Expand Down Expand Up @@ -428,13 +430,13 @@ def cron_thread(self, number):

from odoo.addons.base.models.ir_cron import ir_cron
conn = odoo.sql_db.db_connect('postgres')
with conn.cursor() as cr:
with conn.cursor() as cr, select() as sel:
pg_conn = cr._cnx
cr.execute("LISTEN cron_trigger")
cr.commit()

sel.register(pg_conn, selectors.EVENT_READ)
while True:
select.select([pg_conn], [], [], SLEEP_INTERVAL + number)
sel.select(timeout=SLEEP_INTERVAL + number)
time.sleep(number / 100)
pg_conn.poll()

Expand Down Expand Up @@ -833,17 +835,19 @@ def sleep(self):
try:
# map of fd -> worker
fds = {w.watchdog_pipe[0]: w for w in self.workers.values()}
fd_in = list(fds) + [self.pipe[0]]
# check for ping or internal wakeups
ready = select.select(fd_in, [], [], self.beat)
# update worker watchdogs
for fd in ready[0]:
if fd in fds:
fds[fd].watchdog_time = time.time()
empty_pipe(fd)
except select.error as e:
if e.args[0] not in [errno.EINTR]:
raise
with select() as sel:
for fd in fds:
sel.register(fd, selectors.EVENT_READ)
sel.register(self.pipe[0], selectors.EVENT_READ)
events = sel.select(timeout=self.beat)
for key, _mask in events:
# update worker watchdogs
if key.fd in fds:
fds[key.fd].watchdog_time = time.time()
empty_pipe(key.fd)
except InterruptedError:
pass

def start(self):
# wakeup pipe, python doesn't throw EINTR when a syscall is interrupted
Expand Down Expand Up @@ -963,12 +967,15 @@ def signal_time_expired_handler(self, n, stack):

def sleep(self):
try:
select.select([self.multi.socket, self.wakeup_fd_r], [], [], self.multi.beat)
# clear wakeup pipe if we were interrupted
empty_pipe(self.wakeup_fd_r)
except select.error as e:
if e.args[0] not in [errno.EINTR]:
raise
with select() as sel:
sel.register(self.multi.socket, selectors.EVENT_READ)
sel.register(self.wakeup_fd_r, selectors.EVENT_READ)
sel.select(timeout=self.multi.beat)

# clear wakeup pipe if we were interrupted
empty_pipe(self.wakeup_fd_r)
except InterruptedError:
pass

def check_limits(self):
# If our parent changed suicide
Expand Down Expand Up @@ -1117,14 +1124,16 @@ def sleep(self):

# simulate interruptible sleep with select(wakeup_fd, timeout)
try:
select.select([self.wakeup_fd_r, self.dbcursor._cnx], [], [], interval)
# clear pg_conn/wakeup pipe if we were interrupted
time.sleep(self.pid / 100 % .1)
self.dbcursor._cnx.poll()
empty_pipe(self.wakeup_fd_r)
except select.error as e:
if e.args[0] != errno.EINTR:
raise
with select() as sel:
sel.register(self.wakeup_fd_r, selectors.EVENT_READ)
sel.register(self.dbcursor._cnx, selectors.EVENT_READ)
sel.select(timeout=interval)
# clear pg_conn/wakeup pipe if we were interrupted
time.sleep(self.pid / 100 % .1)
self.dbcursor._cnx.poll()
empty_pipe(self.wakeup_fd_r)
except InterruptedError:
pass

def _db_list(self):
if config['db_name']:
Expand Down

0 comments on commit 19982a2

Please sign in to comment.