Skip to content

Commit

Permalink
Refactor sidekiqctl so it can be tested easily
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Feb 28, 2019
1 parent 20f4cdb commit 4d88319
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 235 deletions.
239 changes: 11 additions & 228 deletions bin/sidekiqctl
Expand Up @@ -2,236 +2,19 @@

require 'fileutils'
require 'sidekiq/api'

class Sidekiqctl
DEFAULT_KILL_TIMEOUT = 10
CMD = File.basename($0)

attr_reader :stage, :pidfile, :kill_timeout

def self.print_usage
puts "#{CMD} - control Sidekiq from the command line."
puts
puts "Usage: #{CMD} quiet <pidfile> <kill_timeout>"
puts " #{CMD} stop <pidfile> <kill_timeout>"
puts " #{CMD} status <section>"
puts
puts " <pidfile> is path to a pidfile"
puts " <kill_timeout> is number of seconds to wait until Sidekiq exits"
puts " (default: #{Sidekiqctl::DEFAULT_KILL_TIMEOUT}), after which Sidekiq will be KILL'd"
puts
puts " <section> (optional) view a specific section of the status output"
puts " Valid sections are: #{Sidekiqctl::Status::VALID_SECTIONS.join(', ')}"
puts
puts "Be sure to set the kill_timeout LONGER than Sidekiq's -t timeout. If you want"
puts "to wait 60 seconds for jobs to finish, use `sidekiq -t 60` and `sidekiqctl stop"
puts " path_to_pidfile 61`"
puts
end

def initialize(stage, pidfile, timeout)
@stage = stage
@pidfile = pidfile
@kill_timeout = timeout

done('No pidfile given', :error) if !pidfile
done("Pidfile #{pidfile} does not exist", :warn) if !File.exist?(pidfile)
done('Invalid pidfile content', :error) if pid == 0

fetch_process

begin
send(stage)
rescue NoMethodError
done "Invalid command: #{stage}", :error
end
end

def fetch_process
Process.kill(0, pid)
rescue Errno::ESRCH
done "Process doesn't exist", :error
# We were not allowed to send a signal, but the process must have existed
# when Process.kill() was called.
rescue Errno::EPERM
return pid
end

def done(msg, error = nil)
puts msg
exit(exit_signal(error))
end

def exit_signal(error)
(error == :error) ? 1 : 0
end

def pid
@pid ||= File.read(pidfile).to_i
end

def quiet
`kill -TSTP #{pid}`
end

def stop
`kill -TERM #{pid}`
kill_timeout.times do
begin
Process.kill(0, pid)
rescue Errno::ESRCH
FileUtils.rm_f pidfile
done 'Sidekiq shut down gracefully.'
rescue Errno::EPERM
done 'Not permitted to shut down Sidekiq.'
end
sleep 1
end
`kill -9 #{pid}`
FileUtils.rm_f pidfile
done 'Sidekiq shut down forcefully.'
end
alias_method :shutdown, :stop

class Status
VALID_SECTIONS = %w[all version overview processes queues]
def display(section = nil)
section ||= 'all'
unless VALID_SECTIONS.include? section
puts "I don't know how to check the status of '#{section}'!"
puts "Try one of these: #{VALID_SECTIONS.join(', ')}"
return
end
send(section)
rescue StandardError => e
puts "Couldn't get status: #{e}"
end

def all
version
puts
overview
puts
processes
puts
queues
end

def version
puts "Sidekiq #{Sidekiq::VERSION}"
puts Time.now
end

def overview
puts '---- Overview ----'
puts " Processed: #{delimit stats.processed}"
puts " Failed: #{delimit stats.failed}"
puts " Busy: #{delimit stats.workers_size}"
puts " Enqueued: #{delimit stats.enqueued}"
puts " Retries: #{delimit stats.retry_size}"
puts " Scheduled: #{delimit stats.scheduled_size}"
puts " Dead: #{delimit stats.dead_size}"
end

def processes
puts "---- Processes (#{process_set.size}) ----"
process_set.each_with_index do |process, index|
puts "#{process['identity']} #{tags_for(process)}"
puts " Started: #{Time.at(process['started_at'])} (#{time_ago(process['started_at'])})"
puts " Threads: #{process['concurrency']} (#{process['busy']} busy)"
puts " Queues: #{split_multiline(process['queues'].sort, pad: 11)}"
puts '' unless (index+1) == process_set.size
end
end

COL_PAD = 2
def queues
puts "---- Queues (#{queue_data.size}) ----"
columns = {
name: [:ljust, (['name'] + queue_data.map(&:name)).map(&:length).max + COL_PAD],
size: [:rjust, (['size'] + queue_data.map(&:size)).map(&:length).max + COL_PAD],
latency: [:rjust, (['latency'] + queue_data.map(&:latency)).map(&:length).max + COL_PAD]
}
columns.each { |col, (dir, width)| print col.to_s.upcase.public_send(dir, width) }
puts
queue_data.each do |q|
columns.each do |col, (dir, width)|
print q.send(col).public_send(dir, width)
end
puts
end
end

private

def delimit(number)
number.to_s.reverse.scan(/.{1,3}/).join(',').reverse
end

def split_multiline(values, opts = {})
return 'none' unless values
pad = opts[:pad] || 0
max_length = opts[:max_length] || (80 - pad)
out = []
line = ''
values.each do |value|
if (line.length + value.length) > max_length
out << line
line = ' ' * pad
end
line << value + ', '
end
out << line[0..-3]
out.join("\n")
end

def tags_for(process)
tags = [
process['tag'],
process['labels'],
(process['quiet'] == 'true' ? 'quiet' : nil)
].flatten.compact
tags.any? ? "[#{tags.join('] [')}]" : nil
end

def time_ago(timestamp)
seconds = Time.now - Time.at(timestamp)
return 'just now' if seconds < 60
return 'a minute ago' if seconds < 120
return "#{seconds.floor / 60} minutes ago" if seconds < 3600
return 'an hour ago' if seconds < 7200
"#{seconds.floor / 60 / 60} hours ago"
end

QUEUE_STRUCT = Struct.new(:name, :size, :latency)
def queue_data
@queue_data ||= Sidekiq::Queue.all.map do |q|
QUEUE_STRUCT.new(q.name, q.size.to_s, sprintf('%#.2f', q.latency))
end
end

def process_set
@process_set ||= Sidekiq::ProcessSet.new
end

def stats
@stats ||= Sidekiq::Stats.new
end
end
end
require 'sidekiq/ctl'

if ARGV[0] == 'status'
Sidekiqctl::Status.new.display(ARGV[1])
exit
end

if ARGV.length < 2
Sidekiqctl.print_usage
Sidekiq::Ctl::Status.new.display(ARGV[1])
else
stage = ARGV[0]
pidfile = ARGV[1]
timeout = ARGV[2].to_i
timeout = Sidekiqctl::DEFAULT_KILL_TIMEOUT if timeout == 0
if ARGV.length < 2
Sidekiq::Ctl.print_usage
else
stage = ARGV[0]
pidfile = ARGV[1]
timeout = ARGV[2].to_i
timeout = Sidekiq::Ctl::DEFAULT_KILL_TIMEOUT if timeout == 0

Sidekiqctl.new(stage, pidfile, timeout)
Sidekiq::Ctl.new(stage, pidfile, timeout)
end
end

0 comments on commit 4d88319

Please sign in to comment.