Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The celery multi command now works as expected #6388

Merged
merged 1 commit into from Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 23 additions & 3 deletions celery/apps/multi.py
Expand Up @@ -150,7 +150,7 @@ def _setdefaultopt(self, d, alt, value):
pass
value = d.setdefault(alt[0], os.path.normpath(value))
dir_path = os.path.dirname(value)
if not os.path.exists(dir_path):
if dir_path and not os.path.exists(dir_path):
os.makedirs(dir_path)
return value

Expand All @@ -160,10 +160,30 @@ def _prepare_expander(self):
self.name, shortname, hostname)

def _prepare_argv(self):
cmd = self.expander(self.cmd).split(' ')
i = cmd.index('celery') + 1

options = self.options.copy()
for opt, value in self.options.items():
if opt in (
'-A', '--app',
'-b', '--broker',
'--result-backend',
'--loader',
'--config',
'--workdir',
'-C', '--no-color',
'-q', '--quiet',
):
cmd.insert(i, format_opt(opt, self.expander(value)))

options.pop(opt)

cmd = [' '.join(cmd)]
argv = tuple(
[self.expander(self.cmd)] +
cmd +
[format_opt(opt, self.expander(value))
for opt, value in self.options.items()] +
for opt, value in options.items()] +
[self.extra_args]
)
if self.append:
Expand Down
7 changes: 6 additions & 1 deletion celery/bin/multi.py
Expand Up @@ -471,4 +471,9 @@ def DOWN(self):
def multi(ctx):
"""Start multiple worker instances."""
cmd = MultiTool(quiet=ctx.obj.quiet, no_color=ctx.obj.no_color)
return cmd.execute_from_commandline([''] + ctx.args)
# In 4.x, celery multi ignores the global --app option.
# Since in 5.0 the --app option is global only we
# rearrange the arguments so that the MultiTool will parse them correctly.
args = sys.argv[1:]
args = args[args.index('multi'):] + args[:args.index('multi')]
return cmd.execute_from_commandline(args)
2 changes: 1 addition & 1 deletion requirements/test-ci-default.txt
Expand Up @@ -12,7 +12,7 @@
-r extras/thread.txt
-r extras/elasticsearch.txt
-r extras/couchdb.txt
-r extras/couchbase.txt
#-r extras/couchbase.txt
-r extras/arangodb.txt
-r extras/consul.txt
-r extras/cosmosdbsql.txt
Expand Down
35 changes: 17 additions & 18 deletions t/unit/apps/test_multi.py
Expand Up @@ -69,7 +69,7 @@ def test_parse(self, gethostname, mkdirs_mock):
'--', '.disable_rate_limits=1',
])
p.parse()
it = multi_args(p, cmd='COMMAND', append='*AP*',
it = multi_args(p, cmd='celery multi', append='*AP*',
prefix='*P*', suffix='*S*')
nodes = list(it)

Expand All @@ -85,32 +85,32 @@ def assert_line_in(name, args):

assert_line_in(
'*P*jerry@*S*',
['COMMAND', '-n *P*jerry@*S*', '-Q bar',
['celery multi', '-n *P*jerry@*S*', '-Q bar',
'-c 5', '--flag', '--logfile=/var/log/celery/foo',
'-- .disable_rate_limits=1', '*AP*'],
)
assert_line_in(
'*P*elaine@*S*',
['COMMAND', '-n *P*elaine@*S*', '-Q bar',
['celery multi', '-n *P*elaine@*S*', '-Q bar',
'-c 5', '--flag', '--logfile=/var/log/celery/foo',
'-- .disable_rate_limits=1', '*AP*'],
)
assert_line_in(
'*P*kramer@*S*',
['COMMAND', '--loglevel=DEBUG', '-n *P*kramer@*S*',
['celery multi', '--loglevel=DEBUG', '-n *P*kramer@*S*',
'-Q bar', '--flag', '--logfile=/var/log/celery/foo',
'-- .disable_rate_limits=1', '*AP*'],
)
expand = nodes[0].expander
assert expand('%h') == '*P*jerry@*S*'
assert expand('%n') == '*P*jerry'
nodes2 = list(multi_args(p, cmd='COMMAND', append='',
nodes2 = list(multi_args(p, cmd='celery multi', append='',
prefix='*P*', suffix='*S*'))
assert nodes2[0].argv[-1] == '-- .disable_rate_limits=1'

p2 = NamespacedOptionParser(['10', '-c:1', '5'])
p2.parse()
nodes3 = list(multi_args(p2, cmd='COMMAND'))
nodes3 = list(multi_args(p2, cmd='celery multi'))

def _args(name, *args):
return args + (
Expand All @@ -123,40 +123,40 @@ def _args(name, *args):
assert len(nodes3) == 10
assert nodes3[0].name == 'celery1@example.com'
assert nodes3[0].argv == (
'COMMAND', '-c 5', '-n celery1@example.com') + _args('celery1')
'celery multi', '-c 5', '-n celery1@example.com') + _args('celery1')
for i, worker in enumerate(nodes3[1:]):
assert worker.name == 'celery%s@example.com' % (i + 2)
node_i = f'celery{i + 2}'
assert worker.argv == (
'COMMAND',
'celery multi',
f'-n {node_i}@example.com') + _args(node_i)

nodes4 = list(multi_args(p2, cmd='COMMAND', suffix='""'))
nodes4 = list(multi_args(p2, cmd='celery multi', suffix='""'))
assert len(nodes4) == 10
assert nodes4[0].name == 'celery1@'
assert nodes4[0].argv == (
'COMMAND', '-c 5', '-n celery1@') + _args('celery1')
'celery multi', '-c 5', '-n celery1@') + _args('celery1')

p3 = NamespacedOptionParser(['foo@', '-c:foo', '5'])
p3.parse()
nodes5 = list(multi_args(p3, cmd='COMMAND', suffix='""'))
nodes5 = list(multi_args(p3, cmd='celery multi', suffix='""'))
assert nodes5[0].name == 'foo@'
assert nodes5[0].argv == (
'COMMAND', '-c 5', '-n foo@') + _args('foo')
'celery multi', '-c 5', '-n foo@') + _args('foo')

p4 = NamespacedOptionParser(['foo', '-Q:1', 'test'])
p4.parse()
nodes6 = list(multi_args(p4, cmd='COMMAND', suffix='""'))
nodes6 = list(multi_args(p4, cmd='celery multi', suffix='""'))
assert nodes6[0].name == 'foo@'
assert nodes6[0].argv == (
'COMMAND', '-Q test', '-n foo@') + _args('foo')
'celery multi', '-Q test', '-n foo@') + _args('foo')

p5 = NamespacedOptionParser(['foo@bar', '-Q:1', 'test'])
p5.parse()
nodes7 = list(multi_args(p5, cmd='COMMAND', suffix='""'))
nodes7 = list(multi_args(p5, cmd='celery multi', suffix='""'))
assert nodes7[0].name == 'foo@bar'
assert nodes7[0].argv == (
'COMMAND', '-Q test', '-n foo@bar') + _args('foo')
'celery multi', '-Q test', '-n foo@bar') + _args('foo')

p6 = NamespacedOptionParser(['foo@bar', '-Q:0', 'test'])
p6.parse()
Expand Down Expand Up @@ -192,8 +192,7 @@ def test_from_kwargs(self):
max_tasks_per_child=30, A='foo', Q='q1,q2', O='fair',
)
assert sorted(n.argv) == sorted([
'-m celery worker --detach',
'-A foo',
'-m celery -A foo worker --detach',
f'--executable={n.executable}',
'-O fair',
'-n foo@bar.com',
Expand Down