Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a concurrency model with ThreadPoolExecutor #5099

Merged
merged 5 commits into from Nov 24, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions appveyor.yml
Expand Up @@ -38,6 +38,7 @@ install:
- "powershell extra\\appveyor\\install.ps1"
- "%PYTHON%/Scripts/pip.exe install -U setuptools"
- "%PYTHON%/Scripts/pip.exe install -U eventlet"
- "%PYTHON%/Scripts/pip.exe install -U -r requirements/extras/thread.txt"

build: off

Expand Down
7 changes: 7 additions & 0 deletions celery/concurrency/__init__.py
Expand Up @@ -17,6 +17,13 @@
'processes': 'celery.concurrency.prefork:TaskPool', # XXX compat alias
}

try:
import concurrent.futures # noqa: F401
except ImportError:
pass
else:
ALIASES['threads'] = 'celery.concurrency.thread:TaskPool'


def get_implementation(cls):
"""Return pool implementation by name."""
Expand Down
53 changes: 53 additions & 0 deletions celery/concurrency/thread.py
@@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
"""Thread execution pool."""
from __future__ import absolute_import, unicode_literals

import sys

from concurrent.futures import wait, ThreadPoolExecutor
from .base import BasePool, apply_target

__all__ = ('TaskPool',)


class ApplyResult(object):
def __init__(self, future):
self.f = future
self.get = self.f.result

def wait(self, timeout=None):
wait([self.f], timeout)


class TaskPool(BasePool):
"""Thread Task Pool."""

body_can_be_buffer = True
signal_safe = False

def __init__(self, *args, **kwargs):
super(TaskPool, self).__init__(*args, **kwargs)

# from 3.5, it is calculated from number of CPUs
if (3, 0) <= sys.version_info < (3, 5) and self.limit is None:
self.limit = 5

self.executor = ThreadPoolExecutor(max_workers=self.limit)

def on_stop(self):
self.executor.shutdown()
super(TaskPool, self).on_stop()

def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, **_):
f = self.executor.submit(apply_target, target, args, kwargs,
callback, accept_callback)
return ApplyResult(f)

def _get_info(self):
return {
'max-concurrency': self.limit,
'threads': len(self.executor._threads)
# TODO use a public api to retrieve the current number of threads
# in the executor when available. (Currently not available).
}
1 change: 1 addition & 0 deletions docs/getting-started/introduction.rst
Expand Up @@ -134,6 +134,7 @@ Celery is…

- prefork (multiprocessing),
- Eventlet_, gevent_
- thread (multithreaded)
alfred-sa marked this conversation as resolved.
Show resolved Hide resolved
- `solo` (single threaded)

- **Result Stores**
Expand Down
2 changes: 1 addition & 1 deletion docs/includes/introduction.txt
Expand Up @@ -132,7 +132,7 @@ It supports…

- **Concurrency**

- Prefork, Eventlet_, gevent_, single threaded (``solo``)
- Prefork, Eventlet_, gevent_, single threaded (``solo``), thread

- **Result Stores**

Expand Down
2 changes: 1 addition & 1 deletion docs/internals/guide.rst
Expand Up @@ -267,7 +267,7 @@ Module Overview

- celery.concurrency

Execution pool implementations (prefork, eventlet, gevent, solo).
Execution pool implementations (prefork, eventlet, gevent, solo, thread).

- celery.db

Expand Down
11 changes: 11 additions & 0 deletions docs/internals/reference/celery.concurrency.thread.rst
@@ -0,0 +1,11 @@
=============================================================
``celery.concurrency.thread``
=============================================================

.. contents::
:local:
.. currentmodule:: celery.concurrency.thread

.. automodule:: celery.concurrency.thread
:members:
:undoc-members:
1 change: 1 addition & 0 deletions docs/internals/reference/index.rst
Expand Up @@ -19,6 +19,7 @@
celery.concurrency.prefork
celery.concurrency.eventlet
celery.concurrency.gevent
celery.concurrency.thread
celery.concurrency.base
celery.backends
celery.backends.base
Expand Down
2 changes: 1 addition & 1 deletion docs/userguide/workers.rst
Expand Up @@ -244,7 +244,7 @@ Remote control
commands from the command-line. It supports all of the commands
listed below. See :ref:`monitoring-control` for more information.

:pool support: *prefork, eventlet, gevent*, blocking:*solo* (see note)
:pool support: *prefork, eventlet, gevent, thread*, blocking:*solo* (see note)
:broker support: *amqp, redis*

Workers have the ability to be remote controlled using a high-priority
Expand Down
1 change: 1 addition & 0 deletions requirements/extras/thread.txt
@@ -0,0 +1 @@
futures>=3.1.1; python_version < '3.0'
1 change: 1 addition & 0 deletions requirements/test-ci-default.txt
Expand Up @@ -12,6 +12,7 @@
-r extras/memcache.txt
-r extras/eventlet.txt
-r extras/gevent.txt
-r extras/thread.txt
-r extras/elasticsearch.txt
-r extras/couchdb.txt
-r extras/couchbase.txt
Expand Down
29 changes: 29 additions & 0 deletions t/unit/concurrency/test_thread.py
@@ -0,0 +1,29 @@
from __future__ import absolute_import, unicode_literals

import operator
import pytest

from celery.utils.functional import noop

from case import skip


@skip.if_pypy()
alfred-sa marked this conversation as resolved.
Show resolved Hide resolved
class test_thread_TaskPool:

def test_on_apply(self):
from celery.concurrency import thread
x = thread.TaskPool()
x.on_apply(operator.add, (2, 2), {}, noop, noop)

def test_info(self):
from celery.concurrency import thread
x = thread.TaskPool()
assert x.info

def test_on_stop(self):
from celery.concurrency import thread
x = thread.TaskPool()
x.on_stop()
with pytest.raises(RuntimeError):
x.on_apply(operator.add, (2, 2), {}, noop, noop)