Skip to content

Commit

Permalink
events bootstep disabled if no events (celery#5807)
Browse files Browse the repository at this point in the history
* events bootstep disabled if no events

* Added unit tests.
  • Loading branch information
anilaratna2 authored and jeyrce committed Aug 25, 2021
1 parent cc72780 commit ce33cae
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
1 change: 1 addition & 0 deletions celery/worker/consumer/events.py
Expand Up @@ -29,6 +29,7 @@ def __init__(self, c,
not without_gossip or
not without_heartbeat
)
self.enabled = self.send_events
c.event_dispatcher = None
super(Events, self).__init__(c, **kwargs)

Expand Down
2 changes: 1 addition & 1 deletion celery/worker/strategy.py
Expand Up @@ -116,7 +116,7 @@ def default(task, app, consumer,
# (optimized to avoid calling request.send_event)
eventer = consumer.event_dispatcher
events = eventer and eventer.enabled
send_event = eventer.send
send_event = eventer and eventer.send
task_sends_events = events and task.send_events

call_at = consumer.timer.call_at
Expand Down
20 changes: 17 additions & 3 deletions t/unit/bin/test_worker.py
Expand Up @@ -70,6 +70,7 @@ def test_run_from_argv_basic(self):

def run(*args, **kwargs):
pass

x.run = run
x.run_from_argv('celery', [])
x.maybe_detach.assert_called()
Expand Down Expand Up @@ -210,10 +211,10 @@ def test_init_queues(self):
assert 'celery' not in app.amqp.queues.consume_from

c.task_create_missing_queues = False
del(app.amqp.queues)
del (app.amqp.queues)
with pytest.raises(ImproperlyConfigured):
self.Worker(app=self.app).setup_queues(['image'])
del(app.amqp.queues)
del (app.amqp.queues)
c.task_create_missing_queues = True
worker = self.Worker(app=self.app)
worker.setup_queues(['image'])
Expand Down Expand Up @@ -374,6 +375,20 @@ def on_worker_ready(**kwargs):
self.Worker(app=self.app).on_consumer_ready(object())
assert worker_ready_sent[0]

def test_disable_task_events(self):
worker = self.Worker(app=self.app, task_events=False,
without_gossip=True,
without_heartbeat=True)
consumer_steps = worker.blueprint.steps['celery.worker.components.Consumer'].obj.steps
assert not any(True for step in consumer_steps
if step.alias == 'Events')

def test_enable_task_events(self):
worker = self.Worker(app=self.app, task_events=True)
consumer_steps = worker.blueprint.steps['celery.worker.components.Consumer'].obj.steps
assert any(True for step in consumer_steps
if step.alias == 'Events')


@mock.stdouts
class test_funs:
Expand Down Expand Up @@ -422,7 +437,6 @@ def test_main(self):

@mock.stdouts
class test_signal_handlers:

class _Worker(object):
hostname = 'foo'
stopped = False
Expand Down

0 comments on commit ce33cae

Please sign in to comment.