Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly signal that we handled an exception with a retry, fixes #4138 #4141

Merged
merged 2 commits into from Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Changes.md
Expand Up @@ -5,6 +5,7 @@
HEAD
---------

- Fix edge case where a job failure during Redis outage could result in a lost job [#4141]
- Better handling of malformed job arguments in payload [#4095]
- Restore bootstap's dropdown css component [#4099, urkle]
- Allow `Sidekiq::Worker#set` to be chained
Expand Down
9 changes: 5 additions & 4 deletions lib/sidekiq/job_retry.rb
Expand Up @@ -56,7 +56,8 @@ module Sidekiq
# end
#
class JobRetry
class Skip < ::RuntimeError; end
class Handled < ::RuntimeError; end
class Skip < Handled; end

include Sidekiq::Util

Expand All @@ -71,7 +72,7 @@ def initialize(options = {})
# require the worker to be instantiated.
def global(msg, queue)
yield
rescue Skip => ex
rescue Handled => ex
raise ex
rescue Sidekiq::Shutdown => ey
# ignore, will be pushed back onto queue during hard_shutdown
Expand All @@ -92,7 +93,7 @@ def global(msg, queue)
end
end

raise e
raise Handled
end


Expand All @@ -106,7 +107,7 @@ def global(msg, queue)
# calling the handle_exception handlers.
def local(worker, msg, queue)
yield
rescue Skip => ex
rescue Handled => ex
raise ex
rescue Sidekiq::Shutdown => ey
# ignore, will be pushed back onto queue during hard_shutdown
Expand Down
37 changes: 22 additions & 15 deletions lib/sidekiq/processor.rb
Expand Up @@ -147,21 +147,19 @@ def process(work)
jobstr = work.job
queue = work.queue_name

ack = false
# Treat malformed JSON as a special case: job goes straight to the morgue.
job_hash = nil
begin
# Treat malformed JSON as a special case: job goes straight to the morgue.
job_hash = nil
begin
job_hash = Sidekiq.load_json(jobstr)
rescue => ex
handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr })
# we can't notify because the job isn't a valid hash payload.
DeadSet.new.kill(jobstr, notify_failure: false)
ack = true
raise
end
job_hash = Sidekiq.load_json(jobstr)
rescue => ex
handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr })
# we can't notify because the job isn't a valid hash payload.
DeadSet.new.kill(jobstr, notify_failure: false)
return work.acknowledge
end

ack = true
ack = true
begin
dispatch(job_hash, queue) do |worker|
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
execute_job(worker, cloned(job_hash['args']))
Expand All @@ -172,10 +170,19 @@ def process(work)
# within the timeout. Don't acknowledge the work since
# we didn't properly finish it.
ack = false
rescue Exception => ex
e = ex.is_a?(::Sidekiq::JobRetry::Skip) && ex.cause ? ex.cause : ex
rescue Sidekiq::JobRetry::Handled => h
# this is the common case: job raised error and Sidekiq::JobRetry::Handled
# signals that we created a retry successfully. We can acknowlege the job.
e = h.cause ? h.cause : h
handle_exception(e, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr })
raise e
rescue Exception => ex
# Unexpected error! This is very bad and indicates an exception that got past
# the retry subsystem (e.g. network partition). We won't acknowledge the job
# so it can be rescued when using Sidekiq Pro.
ack = false
handle_exception(ex, { :context => "Internal exception!", :job => job_hash, :jobstr => jobstr })
raise e
ensure
work.acknowledge if ack
end
Expand Down