From 77267ee5c6833c1b4669edea2bd8ee94255d2186 Mon Sep 17 00:00:00 2001 From: Gerhard Schlager Date: Thu, 15 Oct 2020 01:36:36 +0200 Subject: [PATCH] FIX: Scheduler failed to run any jobs after Redis flush This adds a check that runs every 60 seconds. It schedules all available jobs for a queue if the keys of the per host queue and the global queue are missing from Redis. --- lib/mini_scheduler/manager.rb | 22 +++++++++++++++++++--- spec/mini_scheduler/manager_spec.rb | 29 +++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/lib/mini_scheduler/manager.rb b/lib/mini_scheduler/manager.rb index 6a05728..ca15af0 100644 --- a/lib/mini_scheduler/manager.rb +++ b/lib/mini_scheduler/manager.rb @@ -12,11 +12,12 @@ def initialize(manager) @manager = manager @hostname = manager.hostname - @reschedule_orphans_thread = Thread.new do + @recovery_thread = Thread.new do while !@stopped sleep 60 @mutex.synchronize do + repair_queue reschedule_orphans end end @@ -45,6 +46,12 @@ def keep_alive MiniScheduler.handle_job_exception(ex, message: "Scheduling manager keep-alive") end + def repair_queue + @manager.repair_queue + rescue => ex + MiniScheduler.handle_job_exception(ex, message: "Scheduling manager queue repair") + end + def reschedule_orphans @manager.reschedule_orphans! rescue => ex @@ -122,10 +129,10 @@ def stop! @stopped = true @keep_alive_thread.kill - @reschedule_orphans_thread.kill + @recovery_thread.kill @keep_alive_thread.join - @reschedule_orphans_thread.join + @recovery_thread.join enq(nil) @@ -252,6 +259,15 @@ def get_klass(name) nil end + def repair_queue + return if redis.exists?(self.class.queue_key(queue)) || + redis.exists?(self.class.queue_key(queue, hostname)) + + self.class.discover_schedules + .select { |schedule| schedule.queue == queue } + .each { |schedule| ensure_schedule!(schedule) } + end + def tick lock do schedule_next_job diff --git a/spec/mini_scheduler/manager_spec.rb b/spec/mini_scheduler/manager_spec.rb index 60b932e..39037d9 100644 --- a/spec/mini_scheduler/manager_spec.rb +++ b/spec/mini_scheduler/manager_spec.rb @@ -195,6 +195,35 @@ def perform manager.stop! end + def queued_jobs(manager, with_hostname:) + hostname = with_hostname ? manager.hostname : nil + key = MiniScheduler::Manager.queue_key(manager.queue, hostname) + redis.zrange(key, 0, -1).map(&:constantize) + end + + it 'should recover from Redis flush' do + manager = MiniScheduler::Manager.new(enable_stats: false) + manager.ensure_schedule!(Testing::SuperLongJob) + manager.ensure_schedule!(Testing::PerHostJob) + + expect(queued_jobs(manager, with_hostname: false)).to include(Testing::SuperLongJob) + expect(queued_jobs(manager, with_hostname: true)).to include(Testing::PerHostJob) + + redis.scan_each(match: "_scheduler_*") do |key| + redis.del(key) + end + + expect(queued_jobs(manager, with_hostname: false)).to be_empty + expect(queued_jobs(manager, with_hostname: true)).to be_empty + + manager.repair_queue + + expect(queued_jobs(manager, with_hostname: false)).to include(Testing::SuperLongJob) + expect(queued_jobs(manager, with_hostname: true)).to include(Testing::PerHostJob) + + manager.stop! + end + it 'should only run pending job once' do Testing::RandomJob.runs = 0