Skip to content

Commit

Permalink
Add a concurrency model with ThreadPoolExecutor (celery#5099)
Browse files Browse the repository at this point in the history
* Add a concurrency model with ThreadPoolExecutor

* thread model test for pypy
  • Loading branch information
alfred-sa authored and jeyrce committed Aug 25, 2021
1 parent c3806c4 commit 438deee
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 3 deletions.
1 change: 1 addition & 0 deletions appveyor.yml
Expand Up @@ -38,6 +38,7 @@ install:
- "powershell extra\\appveyor\\install.ps1"
- "%PYTHON%/python -m pip install -U pip setuptools"
- "%PYTHON%/Scripts/pip.exe install -U eventlet"
- "%PYTHON%/Scripts/pip.exe install -U -r requirements/extras/thread.txt"

build: off

Expand Down
7 changes: 7 additions & 0 deletions celery/concurrency/__init__.py
Expand Up @@ -17,6 +17,13 @@
'processes': 'celery.concurrency.prefork:TaskPool', # XXX compat alias
}

try:
import concurrent.futures # noqa: F401
except ImportError:
pass
else:
ALIASES['threads'] = 'celery.concurrency.thread:TaskPool'


def get_implementation(cls):
"""Return pool implementation by name."""
Expand Down
53 changes: 53 additions & 0 deletions celery/concurrency/thread.py
@@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
"""Thread execution pool."""
from __future__ import absolute_import, unicode_literals

import sys

from concurrent.futures import wait, ThreadPoolExecutor
from .base import BasePool, apply_target

__all__ = ('TaskPool',)


class ApplyResult(object):
def __init__(self, future):
self.f = future
self.get = self.f.result

def wait(self, timeout=None):
wait([self.f], timeout)


class TaskPool(BasePool):
"""Thread Task Pool."""

body_can_be_buffer = True
signal_safe = False

def __init__(self, *args, **kwargs):
super(TaskPool, self).__init__(*args, **kwargs)

# from 3.5, it is calculated from number of CPUs
if (3, 0) <= sys.version_info < (3, 5) and self.limit is None:
self.limit = 5

self.executor = ThreadPoolExecutor(max_workers=self.limit)

def on_stop(self):
self.executor.shutdown()
super(TaskPool, self).on_stop()

def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, **_):
f = self.executor.submit(apply_target, target, args, kwargs,
callback, accept_callback)
return ApplyResult(f)

def _get_info(self):
return {
'max-concurrency': self.limit,
'threads': len(self.executor._threads)
# TODO use a public api to retrieve the current number of threads
# in the executor when available. (Currently not available).
}
1 change: 1 addition & 0 deletions docs/getting-started/introduction.rst
Expand Up @@ -135,6 +135,7 @@ Celery is…

- prefork (multiprocessing),
- Eventlet_, gevent_
- thread (multithreaded)
- `solo` (single threaded)

- **Result Stores**
Expand Down
2 changes: 1 addition & 1 deletion docs/includes/introduction.txt
Expand Up @@ -134,7 +134,7 @@ It supports…

- **Concurrency**

- Prefork, Eventlet_, gevent_, single threaded (``solo``)
- Prefork, Eventlet_, gevent_, single threaded (``solo``), thread

- **Result Stores**

Expand Down
2 changes: 1 addition & 1 deletion docs/internals/guide.rst
Expand Up @@ -267,7 +267,7 @@ Module Overview

- celery.concurrency

Execution pool implementations (prefork, eventlet, gevent, solo).
Execution pool implementations (prefork, eventlet, gevent, solo, thread).

- celery.db

Expand Down
11 changes: 11 additions & 0 deletions docs/internals/reference/celery.concurrency.thread.rst
@@ -0,0 +1,11 @@
=============================================================
``celery.concurrency.thread``
=============================================================

.. contents::
:local:
.. currentmodule:: celery.concurrency.thread

.. automodule:: celery.concurrency.thread
:members:
:undoc-members:
1 change: 1 addition & 0 deletions docs/internals/reference/index.rst
Expand Up @@ -19,6 +19,7 @@
celery.concurrency.prefork
celery.concurrency.eventlet
celery.concurrency.gevent
celery.concurrency.thread
celery.concurrency.base
celery.backends
celery.backends.base
Expand Down
2 changes: 1 addition & 1 deletion docs/userguide/workers.rst
Expand Up @@ -244,7 +244,7 @@ Remote control
commands from the command-line. It supports all of the commands
listed below. See :ref:`monitoring-control` for more information.

:pool support: *prefork, eventlet, gevent*, blocking:*solo* (see note)
:pool support: *prefork, eventlet, gevent, thread*, blocking:*solo* (see note)
:broker support: *amqp, redis*

Workers have the ability to be remote controlled using a high-priority
Expand Down
1 change: 1 addition & 0 deletions requirements/extras/thread.txt
@@ -0,0 +1 @@
futures>=3.1.1; python_version < '3.0'
1 change: 1 addition & 0 deletions requirements/test-ci-base.txt
Expand Up @@ -5,4 +5,5 @@ codecov
-r extras/redis.txt
-r extras/sqlalchemy.txt
-r extras/pymemcache.txt
-r extras/thread.txt
-r extras/auth.txt
1 change: 1 addition & 0 deletions requirements/test-ci-default.txt
Expand Up @@ -11,6 +11,7 @@
-r extras/memcache.txt
-r extras/eventlet.txt
-r extras/gevent.txt
-r extras/thread.txt
-r extras/elasticsearch.txt
-r extras/couchdb.txt
-r extras/couchbase.txt
Expand Down
26 changes: 26 additions & 0 deletions t/unit/concurrency/test_thread.py
@@ -0,0 +1,26 @@
from __future__ import absolute_import, unicode_literals

import operator
import pytest

from celery.utils.functional import noop


class test_thread_TaskPool:

def test_on_apply(self):
from celery.concurrency import thread
x = thread.TaskPool()
x.on_apply(operator.add, (2, 2), {}, noop, noop)

def test_info(self):
from celery.concurrency import thread
x = thread.TaskPool()
assert x.info

def test_on_stop(self):
from celery.concurrency import thread
x = thread.TaskPool()
x.on_stop()
with pytest.raises(RuntimeError):
x.on_apply(operator.add, (2, 2), {}, noop, noop)

0 comments on commit 438deee

Please sign in to comment.