Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS - Reject on failure #5843

Merged
merged 5 commits into from Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions celery/worker/request.py
Expand Up @@ -510,6 +510,10 @@ def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
send_failed_event = False
elif ack:
self.acknowledge()
else:
# supporting the behaviour where a task failed and
# need to be removed from prefetched local queue
self.reject(requeue=False)

# These are special cases where the process would not have had time
# to write the result.
Expand Down
9 changes: 7 additions & 2 deletions t/unit/worker/test_request.py
Expand Up @@ -669,14 +669,17 @@ def test_on_failure_acks_late(self):
def test_on_failure_acks_on_failure_or_timeout_disabled_for_task(self):
job = self.xRequest()
job.time_start = 1
job._on_reject = Mock()
self.mytask.acks_late = True
self.mytask.acks_on_failure_or_timeout = False
try:
raise KeyError('foo')
except KeyError:
exc_info = ExceptionInfo()
job.on_failure(exc_info)
assert job.acknowledged is False

assert job.acknowledged is True
job._on_reject.assert_called_with(req_logger, job.connection_errors, False)

def test_on_failure_acks_on_failure_or_timeout_enabled_for_task(self):
job = self.xRequest()
Expand All @@ -701,7 +704,9 @@ def test_on_failure_acks_on_failure_or_timeout_disabled(self):
except KeyError:
exc_info = ExceptionInfo()
job.on_failure(exc_info)
assert job.acknowledged is False
assert job.acknowledged is True
job._on_reject.assert_called_with(req_logger, job.connection_errors,
False)
self.app.conf.acks_on_failure_or_timeout = True

def test_on_failure_acks_on_failure_or_timeout_enabled(self):
Expand Down