Skip to content

Commit

Permalink
Explicitly signal that we handled an exception with a retry, fixes #4138
Browse files Browse the repository at this point in the history
 (#4141)

Under just the right conditions, we could lose a job:

- Job raises an error
- Retry subsystem catches error and tries to create a retry in Redis but this raises a "Redis down" exception
- Processor catches Redis exception and thinks a retry was created
- Redis comes back online just in time for the job to be acknowledged and lost

That's a very specific and rare set of steps but it can happen.

Instead have the Retry subsystem raise a specific error signaling that it created a retry.  There will be three common cases:

1. Job is successful: job is acknowledged.
2. Job fails, retry is created, Processor rescues specific error: job is acknowledged.
3. Sidekiq::Shutdown is raised: job is not acknowledged

Now there is another case:

4. Job fails, retry fails, Processor rescues Exception: job is NOT acknowledged. Sidekiq Pro's super_fetch will rescue the orphaned job at some point in the future.
  • Loading branch information
mperham committed Apr 12, 2019
1 parent 2f37600 commit c650e9b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 19 deletions.
1 change: 1 addition & 0 deletions Changes.md
Expand Up @@ -5,6 +5,7 @@
HEAD
---------

- Fix edge case where a job failure during Redis outage could result in a lost job [#4141]
- Better handling of malformed job arguments in payload [#4095]
- Restore bootstap's dropdown css component [#4099, urkle]
- Allow `Sidekiq::Worker#set` to be chained
Expand Down
9 changes: 5 additions & 4 deletions lib/sidekiq/job_retry.rb
Expand Up @@ -56,7 +56,8 @@ module Sidekiq
# end
#
class JobRetry
class Skip < ::RuntimeError; end
class Handled < ::RuntimeError; end
class Skip < Handled; end

include Sidekiq::Util

Expand All @@ -71,7 +72,7 @@ def initialize(options = {})
# require the worker to be instantiated.
def global(msg, queue)
yield
rescue Skip => ex
rescue Handled => ex
raise ex
rescue Sidekiq::Shutdown => ey
# ignore, will be pushed back onto queue during hard_shutdown
Expand All @@ -92,7 +93,7 @@ def global(msg, queue)
end
end

raise e
raise Handled
end


Expand All @@ -106,7 +107,7 @@ def global(msg, queue)
# calling the handle_exception handlers.
def local(worker, msg, queue)
yield
rescue Skip => ex
rescue Handled => ex
raise ex
rescue Sidekiq::Shutdown => ey
# ignore, will be pushed back onto queue during hard_shutdown
Expand Down
37 changes: 22 additions & 15 deletions lib/sidekiq/processor.rb
Expand Up @@ -147,21 +147,19 @@ def process(work)
jobstr = work.job
queue = work.queue_name

ack = false
# Treat malformed JSON as a special case: job goes straight to the morgue.
job_hash = nil
begin
# Treat malformed JSON as a special case: job goes straight to the morgue.
job_hash = nil
begin
job_hash = Sidekiq.load_json(jobstr)
rescue => ex
handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr })
# we can't notify because the job isn't a valid hash payload.
DeadSet.new.kill(jobstr, notify_failure: false)
ack = true
raise
end
job_hash = Sidekiq.load_json(jobstr)
rescue => ex
handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr })
# we can't notify because the job isn't a valid hash payload.
DeadSet.new.kill(jobstr, notify_failure: false)
return work.acknowledge
end

ack = true
ack = true
begin
dispatch(job_hash, queue) do |worker|
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
execute_job(worker, cloned(job_hash['args']))
Expand All @@ -172,10 +170,19 @@ def process(work)
# within the timeout. Don't acknowledge the work since
# we didn't properly finish it.
ack = false
rescue Exception => ex
e = ex.is_a?(::Sidekiq::JobRetry::Skip) && ex.cause ? ex.cause : ex
rescue Sidekiq::JobRetry::Handled => h
# this is the common case: job raised error and Sidekiq::JobRetry::Handled
# signals that we created a retry successfully. We can acknowlege the job.
e = h.cause ? h.cause : h
handle_exception(e, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr })
raise e
rescue Exception => ex
# Unexpected error! This is very bad and indicates an exception that got past
# the retry subsystem (e.g. network partition). We won't acknowledge the job
# so it can be rescued when using Sidekiq Pro.
ack = false
handle_exception(ex, { :context => "Internal exception!", :job => job_hash, :jobstr => jobstr })
raise e
ensure
work.acknowledge if ack
end
Expand Down

0 comments on commit c650e9b

Please sign in to comment.