From 4d883194350cf9202ff5c1650730dfa5ee3d2e06 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Thu, 28 Feb 2019 14:36:40 -0800 Subject: [PATCH] Refactor sidekiqctl so it can be tested easily --- bin/sidekiqctl | 239 ++-------------------------------------- lib/sidekiq/ctl.rb | 221 +++++++++++++++++++++++++++++++++++++ test/test_sidekiqctl.rb | 10 +- 3 files changed, 235 insertions(+), 235 deletions(-) create mode 100644 lib/sidekiq/ctl.rb diff --git a/bin/sidekiqctl b/bin/sidekiqctl index 297b34963..d5f9a42d5 100755 --- a/bin/sidekiqctl +++ b/bin/sidekiqctl @@ -2,236 +2,19 @@ require 'fileutils' require 'sidekiq/api' - -class Sidekiqctl - DEFAULT_KILL_TIMEOUT = 10 - CMD = File.basename($0) - - attr_reader :stage, :pidfile, :kill_timeout - - def self.print_usage - puts "#{CMD} - control Sidekiq from the command line." - puts - puts "Usage: #{CMD} quiet " - puts " #{CMD} stop " - puts " #{CMD} status
" - puts - puts " is path to a pidfile" - puts " is number of seconds to wait until Sidekiq exits" - puts " (default: #{Sidekiqctl::DEFAULT_KILL_TIMEOUT}), after which Sidekiq will be KILL'd" - puts - puts "
(optional) view a specific section of the status output" - puts " Valid sections are: #{Sidekiqctl::Status::VALID_SECTIONS.join(', ')}" - puts - puts "Be sure to set the kill_timeout LONGER than Sidekiq's -t timeout. If you want" - puts "to wait 60 seconds for jobs to finish, use `sidekiq -t 60` and `sidekiqctl stop" - puts " path_to_pidfile 61`" - puts - end - - def initialize(stage, pidfile, timeout) - @stage = stage - @pidfile = pidfile - @kill_timeout = timeout - - done('No pidfile given', :error) if !pidfile - done("Pidfile #{pidfile} does not exist", :warn) if !File.exist?(pidfile) - done('Invalid pidfile content', :error) if pid == 0 - - fetch_process - - begin - send(stage) - rescue NoMethodError - done "Invalid command: #{stage}", :error - end - end - - def fetch_process - Process.kill(0, pid) - rescue Errno::ESRCH - done "Process doesn't exist", :error - # We were not allowed to send a signal, but the process must have existed - # when Process.kill() was called. - rescue Errno::EPERM - return pid - end - - def done(msg, error = nil) - puts msg - exit(exit_signal(error)) - end - - def exit_signal(error) - (error == :error) ? 1 : 0 - end - - def pid - @pid ||= File.read(pidfile).to_i - end - - def quiet - `kill -TSTP #{pid}` - end - - def stop - `kill -TERM #{pid}` - kill_timeout.times do - begin - Process.kill(0, pid) - rescue Errno::ESRCH - FileUtils.rm_f pidfile - done 'Sidekiq shut down gracefully.' - rescue Errno::EPERM - done 'Not permitted to shut down Sidekiq.' - end - sleep 1 - end - `kill -9 #{pid}` - FileUtils.rm_f pidfile - done 'Sidekiq shut down forcefully.' - end - alias_method :shutdown, :stop - - class Status - VALID_SECTIONS = %w[all version overview processes queues] - def display(section = nil) - section ||= 'all' - unless VALID_SECTIONS.include? section - puts "I don't know how to check the status of '#{section}'!" - puts "Try one of these: #{VALID_SECTIONS.join(', ')}" - return - end - send(section) - rescue StandardError => e - puts "Couldn't get status: #{e}" - end - - def all - version - puts - overview - puts - processes - puts - queues - end - - def version - puts "Sidekiq #{Sidekiq::VERSION}" - puts Time.now - end - - def overview - puts '---- Overview ----' - puts " Processed: #{delimit stats.processed}" - puts " Failed: #{delimit stats.failed}" - puts " Busy: #{delimit stats.workers_size}" - puts " Enqueued: #{delimit stats.enqueued}" - puts " Retries: #{delimit stats.retry_size}" - puts " Scheduled: #{delimit stats.scheduled_size}" - puts " Dead: #{delimit stats.dead_size}" - end - - def processes - puts "---- Processes (#{process_set.size}) ----" - process_set.each_with_index do |process, index| - puts "#{process['identity']} #{tags_for(process)}" - puts " Started: #{Time.at(process['started_at'])} (#{time_ago(process['started_at'])})" - puts " Threads: #{process['concurrency']} (#{process['busy']} busy)" - puts " Queues: #{split_multiline(process['queues'].sort, pad: 11)}" - puts '' unless (index+1) == process_set.size - end - end - - COL_PAD = 2 - def queues - puts "---- Queues (#{queue_data.size}) ----" - columns = { - name: [:ljust, (['name'] + queue_data.map(&:name)).map(&:length).max + COL_PAD], - size: [:rjust, (['size'] + queue_data.map(&:size)).map(&:length).max + COL_PAD], - latency: [:rjust, (['latency'] + queue_data.map(&:latency)).map(&:length).max + COL_PAD] - } - columns.each { |col, (dir, width)| print col.to_s.upcase.public_send(dir, width) } - puts - queue_data.each do |q| - columns.each do |col, (dir, width)| - print q.send(col).public_send(dir, width) - end - puts - end - end - - private - - def delimit(number) - number.to_s.reverse.scan(/.{1,3}/).join(',').reverse - end - - def split_multiline(values, opts = {}) - return 'none' unless values - pad = opts[:pad] || 0 - max_length = opts[:max_length] || (80 - pad) - out = [] - line = '' - values.each do |value| - if (line.length + value.length) > max_length - out << line - line = ' ' * pad - end - line << value + ', ' - end - out << line[0..-3] - out.join("\n") - end - - def tags_for(process) - tags = [ - process['tag'], - process['labels'], - (process['quiet'] == 'true' ? 'quiet' : nil) - ].flatten.compact - tags.any? ? "[#{tags.join('] [')}]" : nil - end - - def time_ago(timestamp) - seconds = Time.now - Time.at(timestamp) - return 'just now' if seconds < 60 - return 'a minute ago' if seconds < 120 - return "#{seconds.floor / 60} minutes ago" if seconds < 3600 - return 'an hour ago' if seconds < 7200 - "#{seconds.floor / 60 / 60} hours ago" - end - - QUEUE_STRUCT = Struct.new(:name, :size, :latency) - def queue_data - @queue_data ||= Sidekiq::Queue.all.map do |q| - QUEUE_STRUCT.new(q.name, q.size.to_s, sprintf('%#.2f', q.latency)) - end - end - - def process_set - @process_set ||= Sidekiq::ProcessSet.new - end - - def stats - @stats ||= Sidekiq::Stats.new - end - end -end +require 'sidekiq/ctl' if ARGV[0] == 'status' - Sidekiqctl::Status.new.display(ARGV[1]) - exit -end - -if ARGV.length < 2 - Sidekiqctl.print_usage + Sidekiq::Ctl::Status.new.display(ARGV[1]) else - stage = ARGV[0] - pidfile = ARGV[1] - timeout = ARGV[2].to_i - timeout = Sidekiqctl::DEFAULT_KILL_TIMEOUT if timeout == 0 + if ARGV.length < 2 + Sidekiq::Ctl.print_usage + else + stage = ARGV[0] + pidfile = ARGV[1] + timeout = ARGV[2].to_i + timeout = Sidekiq::Ctl::DEFAULT_KILL_TIMEOUT if timeout == 0 - Sidekiqctl.new(stage, pidfile, timeout) + Sidekiq::Ctl.new(stage, pidfile, timeout) + end end diff --git a/lib/sidekiq/ctl.rb b/lib/sidekiq/ctl.rb new file mode 100644 index 000000000..ffcf6590c --- /dev/null +++ b/lib/sidekiq/ctl.rb @@ -0,0 +1,221 @@ +#!/usr/bin/env ruby + +require 'fileutils' +require 'sidekiq/api' + +class Sidekiq::Ctl + DEFAULT_KILL_TIMEOUT = 10 + CMD = File.basename($0) + + attr_reader :stage, :pidfile, :kill_timeout + + def self.print_usage + puts "#{CMD} - control Sidekiq from the command line." + puts + puts "Usage: #{CMD} quiet " + puts " #{CMD} stop " + puts " #{CMD} status
" + puts + puts " is path to a pidfile" + puts " is number of seconds to wait until Sidekiq exits" + puts " (default: #{Sidekiqctl::DEFAULT_KILL_TIMEOUT}), after which Sidekiq will be KILL'd" + puts + puts "
(optional) view a specific section of the status output" + puts " Valid sections are: #{Sidekiqctl::Status::VALID_SECTIONS.join(', ')}" + puts + puts "Be sure to set the kill_timeout LONGER than Sidekiq's -t timeout. If you want" + puts "to wait 60 seconds for jobs to finish, use `sidekiq -t 60` and `sidekiqctl stop" + puts " path_to_pidfile 61`" + puts + end + + def initialize(stage, pidfile, timeout) + @stage = stage + @pidfile = pidfile + @kill_timeout = timeout + + done('No pidfile given', :error) if !pidfile + done("Pidfile #{pidfile} does not exist", :warn) if !File.exist?(pidfile) + done('Invalid pidfile content', :error) if pid == 0 + + fetch_process + + begin + send(stage) + rescue NoMethodError + done "Invalid command: #{stage}", :error + end + end + + def fetch_process + Process.kill(0, pid) + rescue Errno::ESRCH + done "Process doesn't exist", :error + # We were not allowed to send a signal, but the process must have existed + # when Process.kill() was called. + rescue Errno::EPERM + return pid + end + + def done(msg, error = nil) + puts msg + exit(exit_signal(error)) + end + + def exit_signal(error) + (error == :error) ? 1 : 0 + end + + def pid + @pid ||= File.read(pidfile).to_i + end + + def quiet + `kill -TSTP #{pid}` + end + + def stop + `kill -TERM #{pid}` + kill_timeout.times do + begin + Process.kill(0, pid) + rescue Errno::ESRCH + FileUtils.rm_f pidfile + done 'Sidekiq shut down gracefully.' + rescue Errno::EPERM + done 'Not permitted to shut down Sidekiq.' + end + sleep 1 + end + `kill -9 #{pid}` + FileUtils.rm_f pidfile + done 'Sidekiq shut down forcefully.' + end + alias_method :shutdown, :stop + + class Status + VALID_SECTIONS = %w[all version overview processes queues] + def display(section = nil) + section ||= 'all' + unless VALID_SECTIONS.include? section + puts "I don't know how to check the status of '#{section}'!" + puts "Try one of these: #{VALID_SECTIONS.join(', ')}" + return + end + send(section) + rescue StandardError => e + puts "Couldn't get status: #{e}" + end + + def all + version + puts + overview + puts + processes + puts + queues + end + + def version + puts "Sidekiq #{Sidekiq::VERSION}" + puts Time.now + end + + def overview + puts '---- Overview ----' + puts " Processed: #{delimit stats.processed}" + puts " Failed: #{delimit stats.failed}" + puts " Busy: #{delimit stats.workers_size}" + puts " Enqueued: #{delimit stats.enqueued}" + puts " Retries: #{delimit stats.retry_size}" + puts " Scheduled: #{delimit stats.scheduled_size}" + puts " Dead: #{delimit stats.dead_size}" + end + + def processes + puts "---- Processes (#{process_set.size}) ----" + process_set.each_with_index do |process, index| + puts "#{process['identity']} #{tags_for(process)}" + puts " Started: #{Time.at(process['started_at'])} (#{time_ago(process['started_at'])})" + puts " Threads: #{process['concurrency']} (#{process['busy']} busy)" + puts " Queues: #{split_multiline(process['queues'].sort, pad: 11)}" + puts '' unless (index+1) == process_set.size + end + end + + COL_PAD = 2 + def queues + puts "---- Queues (#{queue_data.size}) ----" + columns = { + name: [:ljust, (['name'] + queue_data.map(&:name)).map(&:length).max + COL_PAD], + size: [:rjust, (['size'] + queue_data.map(&:size)).map(&:length).max + COL_PAD], + latency: [:rjust, (['latency'] + queue_data.map(&:latency)).map(&:length).max + COL_PAD] + } + columns.each { |col, (dir, width)| print col.to_s.upcase.public_send(dir, width) } + puts + queue_data.each do |q| + columns.each do |col, (dir, width)| + print q.send(col).public_send(dir, width) + end + puts + end + end + + private + + def delimit(number) + number.to_s.reverse.scan(/.{1,3}/).join(',').reverse + end + + def split_multiline(values, opts = {}) + return 'none' unless values + pad = opts[:pad] || 0 + max_length = opts[:max_length] || (80 - pad) + out = [] + line = '' + values.each do |value| + if (line.length + value.length) > max_length + out << line + line = ' ' * pad + end + line << value + ', ' + end + out << line[0..-3] + out.join("\n") + end + + def tags_for(process) + tags = [ + process['tag'], + process['labels'], + (process['quiet'] == 'true' ? 'quiet' : nil) + ].flatten.compact + tags.any? ? "[#{tags.join('] [')}]" : nil + end + + def time_ago(timestamp) + seconds = Time.now - Time.at(timestamp) + return 'just now' if seconds < 60 + return 'a minute ago' if seconds < 120 + return "#{seconds.floor / 60} minutes ago" if seconds < 3600 + return 'an hour ago' if seconds < 7200 + "#{seconds.floor / 60 / 60} hours ago" + end + + QUEUE_STRUCT = Struct.new(:name, :size, :latency) + def queue_data + @queue_data ||= Sidekiq::Queue.all.map do |q| + QUEUE_STRUCT.new(q.name, q.size.to_s, sprintf('%#.2f', q.latency)) + end + end + + def process_set + @process_set ||= Sidekiq::ProcessSet.new + end + + def stats + @stats ||= Sidekiq::Stats.new + end + end +end diff --git a/test/test_sidekiqctl.rb b/test/test_sidekiqctl.rb index 3d2efafdf..693031895 100644 --- a/test/test_sidekiqctl.rb +++ b/test/test_sidekiqctl.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true require_relative 'helper' +require 'sidekiq/ctl' def capture_stdout $stdout = StringIO.new @@ -9,18 +10,13 @@ def capture_stdout $stdout = STDOUT end -capture_stdout do - ARGV = %w[status] - load 'bin/sidekiqctl' -end - def output(section = 'all') capture_stdout do - Sidekiqctl::Status.new.display(section) + Sidekiq::Ctl::Status.new.display(section) end end -describe Sidekiqctl do +describe Sidekiq::Ctl do describe 'status' do describe 'version' do it 'displays the current Sidekiq version' do