Skip to content

Commit

Permalink
FIX: Scheduler failed to run any jobs after Redis flush
Browse files Browse the repository at this point in the history
This adds a check that runs every 60 seconds. It schedules all available jobs for a queue if the keys of the per host queue and the global queue are missing from Redis.
  • Loading branch information
gschlager committed Oct 14, 2020
1 parent a397c33 commit 77267ee
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 3 deletions.
22 changes: 19 additions & 3 deletions lib/mini_scheduler/manager.rb
Expand Up @@ -12,11 +12,12 @@ def initialize(manager)
@manager = manager
@hostname = manager.hostname

@reschedule_orphans_thread = Thread.new do
@recovery_thread = Thread.new do
while !@stopped
sleep 60

@mutex.synchronize do
repair_queue
reschedule_orphans
end
end
Expand Down Expand Up @@ -45,6 +46,12 @@ def keep_alive
MiniScheduler.handle_job_exception(ex, message: "Scheduling manager keep-alive")
end

def repair_queue
@manager.repair_queue
rescue => ex
MiniScheduler.handle_job_exception(ex, message: "Scheduling manager queue repair")
end

def reschedule_orphans
@manager.reschedule_orphans!
rescue => ex
Expand Down Expand Up @@ -122,10 +129,10 @@ def stop!
@stopped = true

@keep_alive_thread.kill
@reschedule_orphans_thread.kill
@recovery_thread.kill

@keep_alive_thread.join
@reschedule_orphans_thread.join
@recovery_thread.join

enq(nil)

Expand Down Expand Up @@ -252,6 +259,15 @@ def get_klass(name)
nil
end

def repair_queue
return if redis.exists?(self.class.queue_key(queue)) ||
redis.exists?(self.class.queue_key(queue, hostname))

self.class.discover_schedules
.select { |schedule| schedule.queue == queue }
.each { |schedule| ensure_schedule!(schedule) }
end

def tick
lock do
schedule_next_job
Expand Down
29 changes: 29 additions & 0 deletions spec/mini_scheduler/manager_spec.rb
Expand Up @@ -195,6 +195,35 @@ def perform
manager.stop!
end

def queued_jobs(manager, with_hostname:)
hostname = with_hostname ? manager.hostname : nil
key = MiniScheduler::Manager.queue_key(manager.queue, hostname)
redis.zrange(key, 0, -1).map(&:constantize)
end

it 'should recover from Redis flush' do
manager = MiniScheduler::Manager.new(enable_stats: false)
manager.ensure_schedule!(Testing::SuperLongJob)
manager.ensure_schedule!(Testing::PerHostJob)

expect(queued_jobs(manager, with_hostname: false)).to include(Testing::SuperLongJob)
expect(queued_jobs(manager, with_hostname: true)).to include(Testing::PerHostJob)

redis.scan_each(match: "_scheduler_*") do |key|
redis.del(key)
end

expect(queued_jobs(manager, with_hostname: false)).to be_empty
expect(queued_jobs(manager, with_hostname: true)).to be_empty

manager.repair_queue

expect(queued_jobs(manager, with_hostname: false)).to include(Testing::SuperLongJob)
expect(queued_jobs(manager, with_hostname: true)).to include(Testing::PerHostJob)

manager.stop!
end

it 'should only run pending job once' do

Testing::RandomJob.runs = 0
Expand Down

0 comments on commit 77267ee

Please sign in to comment.