/
worker.py
380 lines (315 loc) · 12.7 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
"""Worker command-line program.
This module is the 'program-version' of :mod:`celery.worker`.
It does everything necessary to run that module
as an actual application, like installing signal handlers,
platform tweaks, and so on.
"""
import logging
import os
import platform as _platform
import sys
from datetime import datetime
from functools import partial
from billiard.common import REMAP_SIGTERM
from billiard.process import current_process
from kombu.utils.encoding import safe_str
from celery import VERSION_BANNER, platforms, signals
from celery.app import trace
from celery.exceptions import WorkerShutdown, WorkerTerminate
from celery.loaders.app import AppLoader
from celery.platforms import EX_FAILURE, EX_OK, check_privileges, isatty
from celery.utils import static, term
from celery.utils.debug import cry
from celery.utils.imports import qualname
from celery.utils.log import get_logger, in_sighandler, set_in_sighandler
from celery.utils.text import pluralize
from celery.worker import WorkController
__all__ = ('Worker',)
logger = get_logger(__name__)
is_jython = sys.platform.startswith('java')
is_pypy = hasattr(sys, 'pypy_version_info')
ARTLINES = [
' --------------',
'--- ***** -----',
'-- ******* ----',
'- *** --- * ---',
'- ** ----------',
'- ** ----------',
'- ** ----------',
'- ** ----------',
'- *** --- * ---',
'-- ******* ----',
'--- ***** -----',
' --------------',
]
BANNER = """\
{hostname} v{version}
{platform} {timestamp}
[config]
.> app: {app}
.> transport: {conninfo}
.> results: {results}
.> concurrency: {concurrency}
.> task events: {events}
[queues]
{queues}
"""
EXTRA_INFO_FMT = """
[tasks]
{tasks}
"""
def active_thread_count():
from threading import enumerate
return sum(1 for t in enumerate()
if not t.name.startswith('Dummy-'))
def safe_say(msg):
print(f'\n{msg}', file=sys.__stderr__)
class Worker(WorkController):
"""Worker as a program."""
def on_before_init(self, quiet=False, **kwargs):
self.quiet = quiet
trace.setup_worker_optimizations(self.app, self.hostname)
# this signal can be used to set up configuration for
# workers by name.
signals.celeryd_init.send(
sender=self.hostname, instance=self,
conf=self.app.conf, options=kwargs,
)
check_privileges(self.app.conf.accept_content)
def on_after_init(self, purge=False, no_color=None,
redirect_stdouts=None, redirect_stdouts_level=None,
**kwargs):
self.redirect_stdouts = self.app.either(
'worker_redirect_stdouts', redirect_stdouts)
self.redirect_stdouts_level = self.app.either(
'worker_redirect_stdouts_level', redirect_stdouts_level)
super().setup_defaults(**kwargs)
self.purge = purge
self.no_color = no_color
self._isatty = isatty(sys.stdout)
self.colored = self.app.log.colored(
self.logfile,
enabled=not no_color if no_color is not None else no_color
)
def on_init_blueprint(self):
self._custom_logging = self.setup_logging()
# apply task execution optimizations
# -- This will finalize the app!
trace.setup_worker_optimizations(self.app, self.hostname)
def on_start(self):
app = self.app
WorkController.on_start(self)
# this signal can be used to, for example, change queues after
# the -Q option has been applied.
signals.celeryd_after_setup.send(
sender=self.hostname, instance=self, conf=app.conf,
)
if self.purge:
self.purge_messages()
if not self.quiet:
self.emit_banner()
self.set_process_status('-active-')
self.install_platform_tweaks(self)
if not self._custom_logging and self.redirect_stdouts:
app.log.redirect_stdouts(self.redirect_stdouts_level)
# TODO: Remove the following code in Celery 6.0
if app.conf.maybe_warn_deprecated_settings():
logger.warning(
"Please run `celery upgrade settings path/to/settings.py` "
"to avoid these warnings and to allow a smoother upgrade "
"to Celery 6.0."
)
def emit_banner(self):
# Dump configuration to screen so we have some basic information
# for when users sends bug reports.
use_image = term.supports_images()
if use_image:
print(term.imgcat(static.logo()))
print(safe_str(''.join([
str(self.colored.cyan(
' \n', self.startup_info(artlines=not use_image))),
str(self.colored.reset(self.extra_info() or '')),
])), file=sys.__stdout__)
def on_consumer_ready(self, consumer):
signals.worker_ready.send(sender=consumer)
logger.info('%s ready.', safe_str(self.hostname))
def setup_logging(self, colorize=None):
if colorize is None and self.no_color is not None:
colorize = not self.no_color
return self.app.log.setup(
self.loglevel, self.logfile,
redirect_stdouts=False, colorize=colorize, hostname=self.hostname,
)
def purge_messages(self):
with self.app.connection_for_write() as connection:
count = self.app.control.purge(connection=connection)
if count: # pragma: no cover
print(f"purge: Erased {count} {pluralize(count, 'message')} from the queue.\n")
def tasklist(self, include_builtins=True, sep='\n', int_='celery.'):
return sep.join(
f' . {task}' for task in sorted(self.app.tasks)
if (not task.startswith(int_) if not include_builtins else task)
)
def extra_info(self):
if self.loglevel is None:
return
if self.loglevel <= logging.INFO:
include_builtins = self.loglevel <= logging.DEBUG
tasklist = self.tasklist(include_builtins=include_builtins)
return EXTRA_INFO_FMT.format(tasks=tasklist)
def startup_info(self, artlines=True):
app = self.app
concurrency = str(self.concurrency)
appr = '{}:{:#x}'.format(app.main or '__main__', id(app))
if not isinstance(app.loader, AppLoader):
loader = qualname(app.loader)
if loader.startswith('celery.loaders'): # pragma: no cover
loader = loader[14:]
appr += f' ({loader})'
if self.autoscale:
max, min = self.autoscale
concurrency = f'{{min={min}, max={max}}}'
pool = self.pool_cls
if not isinstance(pool, str):
pool = pool.__module__
concurrency += f" ({pool.split('.')[-1]})"
events = 'ON'
if not self.task_events:
events = 'OFF (enable -E to monitor tasks in this worker)'
banner = BANNER.format(
app=appr,
hostname=safe_str(self.hostname),
timestamp=datetime.now().replace(microsecond=0),
version=VERSION_BANNER,
conninfo=self.app.connection().as_uri(),
results=self.app.backend.as_uri(),
concurrency=concurrency,
platform=safe_str(_platform.platform()),
events=events,
queues=app.amqp.queues.format(indent=0, indent_first=False),
).splitlines()
# integrate the ASCII art.
if artlines:
for i, _ in enumerate(banner):
try:
banner[i] = ' '.join([ARTLINES[i], banner[i]])
except IndexError:
banner[i] = ' ' * 16 + banner[i]
return '\n'.join(banner) + '\n'
def install_platform_tweaks(self, worker):
"""Install platform specific tweaks and workarounds."""
if self.app.IS_macOS:
self.macOS_proxy_detection_workaround()
# Install signal handler so SIGHUP restarts the worker.
if not self._isatty:
# only install HUP handler if detached from terminal,
# so closing the terminal window doesn't restart the worker
# into the background.
if self.app.IS_macOS:
# macOS can't exec from a process using threads.
# See https://github.com/celery/celery/issues#issue/152
install_HUP_not_supported_handler(worker)
else:
install_worker_restart_handler(worker)
install_worker_term_handler(worker)
install_worker_term_hard_handler(worker)
install_worker_int_handler(worker)
install_cry_handler()
install_rdb_handler()
def macOS_proxy_detection_workaround(self):
"""See https://github.com/celery/celery/issues#issue/161."""
os.environ.setdefault('celery_dummy_proxy', 'set_by_celeryd')
def set_process_status(self, info):
return platforms.set_mp_process_title(
'celeryd',
info=f'{info} ({platforms.strargv(sys.argv)})',
hostname=self.hostname,
)
def _shutdown_handler(worker, sig='TERM', how='Warm',
exc=WorkerShutdown, callback=None, exitcode=EX_OK):
def _handle_request(*args):
with in_sighandler():
from celery.worker import state
if current_process()._name == 'MainProcess':
if callback:
callback(worker)
safe_say(f'worker: {how} shutdown (MainProcess)')
signals.worker_shutting_down.send(
sender=worker.hostname, sig=sig, how=how,
exitcode=exitcode,
)
if active_thread_count() > 1:
setattr(state, {'Warm': 'should_stop',
'Cold': 'should_terminate'}[how], exitcode)
else:
raise exc(exitcode)
_handle_request.__name__ = str(f'worker_{how}')
platforms.signals[sig] = _handle_request
if REMAP_SIGTERM == "SIGQUIT":
install_worker_term_handler = partial(
_shutdown_handler, sig='SIGTERM', how='Cold', exc=WorkerTerminate, exitcode=EX_FAILURE,
)
else:
install_worker_term_handler = partial(
_shutdown_handler, sig='SIGTERM', how='Warm', exc=WorkerShutdown,
)
if not is_jython: # pragma: no cover
install_worker_term_hard_handler = partial(
_shutdown_handler, sig='SIGQUIT', how='Cold', exc=WorkerTerminate,
exitcode=EX_FAILURE,
)
else: # pragma: no cover
install_worker_term_handler = \
install_worker_term_hard_handler = lambda *a, **kw: None
def on_SIGINT(worker):
safe_say('worker: Hitting Ctrl+C again will terminate all running tasks!')
install_worker_term_hard_handler(worker, sig='SIGINT')
if not is_jython: # pragma: no cover
install_worker_int_handler = partial(
_shutdown_handler, sig='SIGINT', callback=on_SIGINT,
exitcode=EX_FAILURE,
)
else: # pragma: no cover
def install_worker_int_handler(*args, **kwargs):
pass
def _reload_current_worker():
platforms.close_open_fds([
sys.__stdin__, sys.__stdout__, sys.__stderr__,
])
os.execv(sys.executable, [sys.executable] + sys.argv)
def install_worker_restart_handler(worker, sig='SIGHUP'):
def restart_worker_sig_handler(*args):
"""Signal handler restarting the current python program."""
set_in_sighandler(True)
safe_say(f"Restarting celery worker ({' '.join(sys.argv)})")
import atexit
atexit.register(_reload_current_worker)
from celery.worker import state
state.should_stop = EX_OK
platforms.signals[sig] = restart_worker_sig_handler
def install_cry_handler(sig='SIGUSR1'):
# PyPy does not have sys._current_frames
if is_pypy: # pragma: no cover
return
def cry_handler(*args):
"""Signal handler logging the stack-trace of all active threads."""
with in_sighandler():
safe_say(cry())
platforms.signals[sig] = cry_handler
def install_rdb_handler(envvar='CELERY_RDBSIG',
sig='SIGUSR2'): # pragma: no cover
def rdb_handler(*args):
"""Signal handler setting a rdb breakpoint at the current frame."""
with in_sighandler():
from celery.contrib.rdb import _frame, set_trace
# gevent does not pass standard signal handler args
frame = args[1] if args else _frame().f_back
set_trace(frame)
if os.environ.get(envvar):
platforms.signals[sig] = rdb_handler
def install_HUP_not_supported_handler(worker, sig='SIGHUP'):
def warn_on_HUP_handler(signum, frame):
with in_sighandler():
safe_say('{sig} not supported: Restarting with {sig} is '
'unstable on this platform!'.format(sig=sig))
platforms.signals[sig] = warn_on_HUP_handler