From 72a64618c40544f7489a9bb4ffe3472b86e0e935 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Fri, 5 Apr 2019 16:02:07 -0700 Subject: [PATCH 1/2] Explicitly signal that we handled an exception with a retry, fixes #4138 Under just the right conditions, we could lose a job: - Job raises an error - Retry subsystem catches error and tries to create a retry in Redis but this raises a "Redis down" exception - Processor catches Redis exception and thinks a retry was created - Redis comes back online just in time for the job to be acknowledged and lost That's a very specific and rare set of steps but it can happen. Instead have the Retry subsystem raise a specific error signaling that it created a retry. There will be three common cases: 1. Job is successful: job is acknowledged. 2. Job fails, retry is created, Processor rescues specific error: job is acknowledged. 3. Sidekiq::Shutdown is raised: job is not acknowledged Now there is another case: 4. Job fails, retry fails, Processor rescues Exception: job is NOT acknowledged. Sidekiq Pro's super_fetch will rescue the orphaned job at some point in the future. --- lib/sidekiq/job_retry.rb | 9 +++++---- lib/sidekiq/processor.rb | 37 ++++++++++++++++++++++--------------- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/lib/sidekiq/job_retry.rb b/lib/sidekiq/job_retry.rb index c67548f4c..66ea7eafd 100644 --- a/lib/sidekiq/job_retry.rb +++ b/lib/sidekiq/job_retry.rb @@ -56,7 +56,8 @@ module Sidekiq # end # class JobRetry - class Skip < ::RuntimeError; end + class Handled < ::RuntimeError; end + class Skip < Handled; end include Sidekiq::Util @@ -71,7 +72,7 @@ def initialize(options = {}) # require the worker to be instantiated. def global(msg, queue) yield - rescue Skip => ex + rescue Handled => ex raise ex rescue Sidekiq::Shutdown => ey # ignore, will be pushed back onto queue during hard_shutdown @@ -92,7 +93,7 @@ def global(msg, queue) end end - raise e + raise Handled end @@ -106,7 +107,7 @@ def global(msg, queue) # calling the handle_exception handlers. def local(worker, msg, queue) yield - rescue Skip => ex + rescue Handled => ex raise ex rescue Sidekiq::Shutdown => ey # ignore, will be pushed back onto queue during hard_shutdown diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 82be39d5a..9f70ab005 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -147,21 +147,19 @@ def process(work) jobstr = work.job queue = work.queue_name - ack = false + # Treat malformed JSON as a special case: job goes straight to the morgue. + job_hash = nil begin - # Treat malformed JSON as a special case: job goes straight to the morgue. - job_hash = nil - begin - job_hash = Sidekiq.load_json(jobstr) - rescue => ex - handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr }) - # we can't notify because the job isn't a valid hash payload. - DeadSet.new.kill(jobstr, notify_failure: false) - ack = true - raise - end + job_hash = Sidekiq.load_json(jobstr) + rescue => ex + handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr }) + # we can't notify because the job isn't a valid hash payload. + DeadSet.new.kill(jobstr, notify_failure: false) + return work.acknowledge + end - ack = true + ack = true + begin dispatch(job_hash, queue) do |worker| Sidekiq.server_middleware.invoke(worker, job_hash, queue) do execute_job(worker, cloned(job_hash['args'])) @@ -172,10 +170,19 @@ def process(work) # within the timeout. Don't acknowledge the work since # we didn't properly finish it. ack = false - rescue Exception => ex - e = ex.is_a?(::Sidekiq::JobRetry::Skip) && ex.cause ? ex.cause : ex + rescue Sidekiq::JobRetry::Handled => h + # this is the common case: job raised error and Sidekiq::JobRetry::Handled + # signals that we created a retry successfully. We can acknowlege the job. + e = h.cause ? h.cause : h handle_exception(e, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr }) raise e + rescue Exception => ex + # Unexpected error! This is very bad and indicates an exception that got past + # the retry subsystem (e.g. network partition). We won't acknowledge the job + # so it can be rescued when using Sidekiq Pro. + ack = false + handle_exception(ex, { :context => "Internal exception!", :job => job_hash, :jobstr => jobstr }) + raise e ensure work.acknowledge if ack end From dea5c8e6e2b021b8697c47b99d05afcf80f61d07 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 9 Apr 2019 10:27:24 -0700 Subject: [PATCH 2/2] changes --- Changes.md | 1 + 1 file changed, 1 insertion(+) diff --git a/Changes.md b/Changes.md index e6a415b88..b84e4552f 100644 --- a/Changes.md +++ b/Changes.md @@ -5,6 +5,7 @@ HEAD --------- +- Fix edge case where a job failure during Redis outage could result in a lost job [#4141] - Better handling of malformed job arguments in payload [#4095] - Restore bootstap's dropdown css component [#4099, urkle] - Allow `Sidekiq::Worker#set` to be chained