diff --git a/Changes.md b/Changes.md index e6a415b88..b84e4552f 100644 --- a/Changes.md +++ b/Changes.md @@ -5,6 +5,7 @@ HEAD --------- +- Fix edge case where a job failure during Redis outage could result in a lost job [#4141] - Better handling of malformed job arguments in payload [#4095] - Restore bootstap's dropdown css component [#4099, urkle] - Allow `Sidekiq::Worker#set` to be chained diff --git a/lib/sidekiq/job_retry.rb b/lib/sidekiq/job_retry.rb index c67548f4c..66ea7eafd 100644 --- a/lib/sidekiq/job_retry.rb +++ b/lib/sidekiq/job_retry.rb @@ -56,7 +56,8 @@ module Sidekiq # end # class JobRetry - class Skip < ::RuntimeError; end + class Handled < ::RuntimeError; end + class Skip < Handled; end include Sidekiq::Util @@ -71,7 +72,7 @@ def initialize(options = {}) # require the worker to be instantiated. def global(msg, queue) yield - rescue Skip => ex + rescue Handled => ex raise ex rescue Sidekiq::Shutdown => ey # ignore, will be pushed back onto queue during hard_shutdown @@ -92,7 +93,7 @@ def global(msg, queue) end end - raise e + raise Handled end @@ -106,7 +107,7 @@ def global(msg, queue) # calling the handle_exception handlers. def local(worker, msg, queue) yield - rescue Skip => ex + rescue Handled => ex raise ex rescue Sidekiq::Shutdown => ey # ignore, will be pushed back onto queue during hard_shutdown diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 82be39d5a..9f70ab005 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -147,21 +147,19 @@ def process(work) jobstr = work.job queue = work.queue_name - ack = false + # Treat malformed JSON as a special case: job goes straight to the morgue. + job_hash = nil begin - # Treat malformed JSON as a special case: job goes straight to the morgue. - job_hash = nil - begin - job_hash = Sidekiq.load_json(jobstr) - rescue => ex - handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr }) - # we can't notify because the job isn't a valid hash payload. - DeadSet.new.kill(jobstr, notify_failure: false) - ack = true - raise - end + job_hash = Sidekiq.load_json(jobstr) + rescue => ex + handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr }) + # we can't notify because the job isn't a valid hash payload. + DeadSet.new.kill(jobstr, notify_failure: false) + return work.acknowledge + end - ack = true + ack = true + begin dispatch(job_hash, queue) do |worker| Sidekiq.server_middleware.invoke(worker, job_hash, queue) do execute_job(worker, cloned(job_hash['args'])) @@ -172,10 +170,19 @@ def process(work) # within the timeout. Don't acknowledge the work since # we didn't properly finish it. ack = false - rescue Exception => ex - e = ex.is_a?(::Sidekiq::JobRetry::Skip) && ex.cause ? ex.cause : ex + rescue Sidekiq::JobRetry::Handled => h + # this is the common case: job raised error and Sidekiq::JobRetry::Handled + # signals that we created a retry successfully. We can acknowlege the job. + e = h.cause ? h.cause : h handle_exception(e, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr }) raise e + rescue Exception => ex + # Unexpected error! This is very bad and indicates an exception that got past + # the retry subsystem (e.g. network partition). We won't acknowledge the job + # so it can be rescued when using Sidekiq Pro. + ack = false + handle_exception(ex, { :context => "Internal exception!", :job => job_hash, :jobstr => jobstr }) + raise e ensure work.acknowledge if ack end