Skip to content

Commit

Permalink
The celery multi command now works as expected. (#6388)
Browse files Browse the repository at this point in the history
  • Loading branch information
thedrow committed Oct 7, 2020
1 parent 844774b commit 8a92b71
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 23 deletions.
26 changes: 23 additions & 3 deletions celery/apps/multi.py
Expand Up @@ -150,7 +150,7 @@ def _setdefaultopt(self, d, alt, value):
pass
value = d.setdefault(alt[0], os.path.normpath(value))
dir_path = os.path.dirname(value)
if not os.path.exists(dir_path):
if dir_path and not os.path.exists(dir_path):
os.makedirs(dir_path)
return value

Expand All @@ -160,10 +160,30 @@ def _prepare_expander(self):
self.name, shortname, hostname)

def _prepare_argv(self):
cmd = self.expander(self.cmd).split(' ')
i = cmd.index('celery') + 1

options = self.options.copy()
for opt, value in self.options.items():
if opt in (
'-A', '--app',
'-b', '--broker',
'--result-backend',
'--loader',
'--config',
'--workdir',
'-C', '--no-color',
'-q', '--quiet',
):
cmd.insert(i, format_opt(opt, self.expander(value)))

options.pop(opt)

cmd = [' '.join(cmd)]
argv = tuple(
[self.expander(self.cmd)] +
cmd +
[format_opt(opt, self.expander(value))
for opt, value in self.options.items()] +
for opt, value in options.items()] +
[self.extra_args]
)
if self.append:
Expand Down
7 changes: 6 additions & 1 deletion celery/bin/multi.py
Expand Up @@ -471,4 +471,9 @@ def DOWN(self):
def multi(ctx):
"""Start multiple worker instances."""
cmd = MultiTool(quiet=ctx.obj.quiet, no_color=ctx.obj.no_color)
return cmd.execute_from_commandline([''] + ctx.args)
# In 4.x, celery multi ignores the global --app option.
# Since in 5.0 the --app option is global only we
# rearrange the arguments so that the MultiTool will parse them correctly.
args = sys.argv[1:]
args = args[args.index('multi'):] + args[:args.index('multi')]
return cmd.execute_from_commandline(args)
2 changes: 1 addition & 1 deletion requirements/test-ci-default.txt
Expand Up @@ -12,7 +12,7 @@
-r extras/thread.txt
-r extras/elasticsearch.txt
-r extras/couchdb.txt
-r extras/couchbase.txt
#-r extras/couchbase.txt
-r extras/arangodb.txt
-r extras/consul.txt
-r extras/cosmosdbsql.txt
Expand Down
35 changes: 17 additions & 18 deletions t/unit/apps/test_multi.py
Expand Up @@ -69,7 +69,7 @@ def test_parse(self, gethostname, mkdirs_mock):
'--', '.disable_rate_limits=1',
])
p.parse()
it = multi_args(p, cmd='COMMAND', append='*AP*',
it = multi_args(p, cmd='celery multi', append='*AP*',
prefix='*P*', suffix='*S*')
nodes = list(it)

Expand All @@ -85,32 +85,32 @@ def assert_line_in(name, args):

assert_line_in(
'*P*jerry@*S*',
['COMMAND', '-n *P*jerry@*S*', '-Q bar',
['celery multi', '-n *P*jerry@*S*', '-Q bar',
'-c 5', '--flag', '--logfile=/var/log/celery/foo',
'-- .disable_rate_limits=1', '*AP*'],
)
assert_line_in(
'*P*elaine@*S*',
['COMMAND', '-n *P*elaine@*S*', '-Q bar',
['celery multi', '-n *P*elaine@*S*', '-Q bar',
'-c 5', '--flag', '--logfile=/var/log/celery/foo',
'-- .disable_rate_limits=1', '*AP*'],
)
assert_line_in(
'*P*kramer@*S*',
['COMMAND', '--loglevel=DEBUG', '-n *P*kramer@*S*',
['celery multi', '--loglevel=DEBUG', '-n *P*kramer@*S*',
'-Q bar', '--flag', '--logfile=/var/log/celery/foo',
'-- .disable_rate_limits=1', '*AP*'],
)
expand = nodes[0].expander
assert expand('%h') == '*P*jerry@*S*'
assert expand('%n') == '*P*jerry'
nodes2 = list(multi_args(p, cmd='COMMAND', append='',
nodes2 = list(multi_args(p, cmd='celery multi', append='',
prefix='*P*', suffix='*S*'))
assert nodes2[0].argv[-1] == '-- .disable_rate_limits=1'

p2 = NamespacedOptionParser(['10', '-c:1', '5'])
p2.parse()
nodes3 = list(multi_args(p2, cmd='COMMAND'))
nodes3 = list(multi_args(p2, cmd='celery multi'))

def _args(name, *args):
return args + (
Expand All @@ -123,40 +123,40 @@ def _args(name, *args):
assert len(nodes3) == 10
assert nodes3[0].name == 'celery1@example.com'
assert nodes3[0].argv == (
'COMMAND', '-c 5', '-n celery1@example.com') + _args('celery1')
'celery multi', '-c 5', '-n celery1@example.com') + _args('celery1')
for i, worker in enumerate(nodes3[1:]):
assert worker.name == 'celery%s@example.com' % (i + 2)
node_i = f'celery{i + 2}'
assert worker.argv == (
'COMMAND',
'celery multi',
f'-n {node_i}@example.com') + _args(node_i)

nodes4 = list(multi_args(p2, cmd='COMMAND', suffix='""'))
nodes4 = list(multi_args(p2, cmd='celery multi', suffix='""'))
assert len(nodes4) == 10
assert nodes4[0].name == 'celery1@'
assert nodes4[0].argv == (
'COMMAND', '-c 5', '-n celery1@') + _args('celery1')
'celery multi', '-c 5', '-n celery1@') + _args('celery1')

p3 = NamespacedOptionParser(['foo@', '-c:foo', '5'])
p3.parse()
nodes5 = list(multi_args(p3, cmd='COMMAND', suffix='""'))
nodes5 = list(multi_args(p3, cmd='celery multi', suffix='""'))
assert nodes5[0].name == 'foo@'
assert nodes5[0].argv == (
'COMMAND', '-c 5', '-n foo@') + _args('foo')
'celery multi', '-c 5', '-n foo@') + _args('foo')

p4 = NamespacedOptionParser(['foo', '-Q:1', 'test'])
p4.parse()
nodes6 = list(multi_args(p4, cmd='COMMAND', suffix='""'))
nodes6 = list(multi_args(p4, cmd='celery multi', suffix='""'))
assert nodes6[0].name == 'foo@'
assert nodes6[0].argv == (
'COMMAND', '-Q test', '-n foo@') + _args('foo')
'celery multi', '-Q test', '-n foo@') + _args('foo')

p5 = NamespacedOptionParser(['foo@bar', '-Q:1', 'test'])
p5.parse()
nodes7 = list(multi_args(p5, cmd='COMMAND', suffix='""'))
nodes7 = list(multi_args(p5, cmd='celery multi', suffix='""'))
assert nodes7[0].name == 'foo@bar'
assert nodes7[0].argv == (
'COMMAND', '-Q test', '-n foo@bar') + _args('foo')
'celery multi', '-Q test', '-n foo@bar') + _args('foo')

p6 = NamespacedOptionParser(['foo@bar', '-Q:0', 'test'])
p6.parse()
Expand Down Expand Up @@ -192,8 +192,7 @@ def test_from_kwargs(self):
max_tasks_per_child=30, A='foo', Q='q1,q2', O='fair',
)
assert sorted(n.argv) == sorted([
'-m celery worker --detach',
'-A foo',
'-m celery -A foo worker --detach',
f'--executable={n.executable}',
'-O fair',
'-n foo@bar.com',
Expand Down

0 comments on commit 8a92b71

Please sign in to comment.