/
celery.py
178 lines (150 loc) · 5.2 KB
/
celery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""Celery Command Line Interface."""
import os
import click
import click.exceptions
from click.types import ParamType
from click_didyoumean import DYMGroup
from celery import VERSION_BANNER
from celery.app.utils import find_app
from celery.bin.amqp import amqp
from celery.bin.base import CeleryCommand, CeleryOption, CLIContext
from celery.bin.beat import beat
from celery.bin.call import call
from celery.bin.control import control, inspect, status
from celery.bin.events import events
from celery.bin.graph import graph
from celery.bin.list import list_
from celery.bin.logtool import logtool
from celery.bin.migrate import migrate
from celery.bin.multi import multi
from celery.bin.purge import purge
from celery.bin.result import result
from celery.bin.shell import shell
from celery.bin.upgrade import upgrade
from celery.bin.worker import worker
class App(ParamType):
"""Application option."""
name = "application"
def convert(self, value, param, ctx):
try:
return find_app(value)
except (ModuleNotFoundError, AttributeError) as e:
self.fail(str(e))
APP = App()
@click.group(cls=DYMGroup, invoke_without_command=True)
@click.option('-A',
'--app',
envvar='APP',
cls=CeleryOption,
type=APP,
help_group="Global Options")
@click.option('-b',
'--broker',
envvar='BROKER_URL',
cls=CeleryOption,
help_group="Global Options")
@click.option('--result-backend',
envvar='RESULT_BACKEND',
cls=CeleryOption,
help_group="Global Options")
@click.option('--loader',
envvar='LOADER',
cls=CeleryOption,
help_group="Global Options")
@click.option('--config',
envvar='CONFIG_MODULE',
cls=CeleryOption,
help_group="Global Options")
@click.option('--workdir',
cls=CeleryOption,
help_group="Global Options")
@click.option('-C',
'--no-color',
envvar='NO_COLOR',
is_flag=True,
cls=CeleryOption,
help_group="Global Options")
@click.option('-q',
'--quiet',
is_flag=True,
cls=CeleryOption,
help_group="Global Options")
@click.option('--version',
cls=CeleryOption,
is_flag=True,
help_group="Global Options")
@click.pass_context
def celery(ctx, app, broker, result_backend, loader, config, workdir,
no_color, quiet, version):
"""Celery command entrypoint."""
if version:
click.echo(VERSION_BANNER)
ctx.exit()
elif ctx.invoked_subcommand is None:
click.echo(ctx.get_help())
ctx.exit()
if workdir:
os.chdir(workdir)
if loader:
# Default app takes loader from this env (Issue #1066).
os.environ['CELERY_LOADER'] = loader
if broker:
os.environ['CELERY_BROKER_URL'] = broker
if result_backend:
os.environ['CELERY_RESULT_BACKEND'] = result_backend
if config:
os.environ['CELERY_CONFIG_MODULE'] = config
ctx.obj = CLIContext(app=app, no_color=no_color, workdir=workdir,
quiet=quiet)
# User options
worker.params.extend(ctx.obj.app.user_options.get('worker', []))
beat.params.extend(ctx.obj.app.user_options.get('beat', []))
events.params.extend(ctx.obj.app.user_options.get('events', []))
@celery.command(cls=CeleryCommand)
@click.pass_context
def report(ctx):
"""Shows information useful to include in bug-reports."""
app = ctx.obj.app
app.loader.import_default_modules()
ctx.obj.echo(app.bugreport())
celery.add_command(purge)
celery.add_command(call)
celery.add_command(beat)
celery.add_command(list_)
celery.add_command(result)
celery.add_command(migrate)
celery.add_command(status)
celery.add_command(worker)
celery.add_command(events)
celery.add_command(inspect)
celery.add_command(control)
celery.add_command(graph)
celery.add_command(upgrade)
celery.add_command(logtool)
celery.add_command(amqp)
celery.add_command(shell)
celery.add_command(multi)
# Monkey-patch click to display a custom error
# when -A or --app are used as sub-command options instead of as options
# of the global command.
previous_show_implementation = click.exceptions.NoSuchOption.show
WRONG_APP_OPTION_USAGE_MESSAGE = """You are using `{option_name}` as an option of the {info_name} sub-command:
celery {info_name} {option_name} celeryapp <...>
This was removed in Celery 5.0. Instead you should use `{option_name}` as a global option:
celery {option_name} celeryapp {info_name} <...>"""
def _show(self, file=None):
if self.option_name in ('-A', '--app'):
self.ctx.obj.error(
WRONG_APP_OPTION_USAGE_MESSAGE.format(
option_name=self.option_name,
info_name=self.ctx.info_name),
fg='red'
)
previous_show_implementation(self, file=file)
click.exceptions.NoSuchOption.show = _show
def main() -> int:
"""Start celery umbrella command.
This function is the main entrypoint for the CLI.
:return: The exit code of the CLI.
"""
return celery(auto_envvar_prefix="CELERY")