/
sidekiqload
executable file
·149 lines (133 loc) · 3.54 KB
/
sidekiqload
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env ruby
# Quiet some warnings we see when running in warning mode:
# RUBYOPT=-w bundle exec sidekiq
$TESTING = false
#require 'ruby-prof'
Bundler.require(:default)
require_relative '../lib/sidekiq/cli'
require_relative '../lib/sidekiq/launcher'
include Sidekiq::Util
Sidekiq.configure_server do |config|
#config.options[:concurrency] = 1
config.redis = { db: 13 }
config.options[:queues] << 'default'
config.logger.level = Logger::ERROR
config.average_scheduled_poll_interval = 2
config.reliable! if defined?(Sidekiq::Pro)
end
class LoadWorker
include Sidekiq::Worker
sidekiq_options retry: 1
sidekiq_retry_in do |x|
1
end
def perform(idx)
#raise idx.to_s if idx % 100 == 1
end
end
# brew tap shopify/shopify
# brew install toxiproxy
# gem install toxiproxy
#require 'toxiproxy'
# simulate a non-localhost network for realer-world conditions.
# adding 1ms of network latency has an ENORMOUS impact on benchmarks
#Toxiproxy.populate([{
#"name": "redis",
#"listen": "127.0.0.1:6380",
#"upstream": "127.0.0.1:6379"
#}])
self_read, self_write = IO.pipe
%w(INT TERM TSTP TTIN).each do |sig|
begin
trap sig do
self_write.puts(sig)
end
rescue ArgumentError
puts "Signal #{sig} not supported"
end
end
Sidekiq.redis {|c| c.flushdb}
def handle_signal(launcher, sig)
Sidekiq.logger.debug "Got #{sig} signal"
case sig
when 'INT'
# Handle Ctrl-C in JRuby like MRI
# http://jira.codehaus.org/browse/JRUBY-4637
raise Interrupt
when 'TERM'
# Heroku sends TERM and then waits 30 seconds for process to exit.
raise Interrupt
when 'TSTP'
Sidekiq.logger.info "Received TSTP, no longer accepting new work"
launcher.quiet
when 'TTIN'
Thread.list.each do |thread|
Sidekiq.logger.warn "Thread TID-#{(thread.object_id ^ ::Process.pid).to_s(36)} #{thread['label']}"
if thread.backtrace
Sidekiq.logger.warn thread.backtrace.join("\n")
else
Sidekiq.logger.warn "<no backtrace available>"
end
end
end
end
def Process.rss
`ps -o rss= -p #{Process.pid}`.chomp.to_i
end
iter = 10
count = 10_000
iter.times do
arr = Array.new(count) do
[]
end
count.times do |idx|
arr[idx][0] = idx
end
Sidekiq::Client.push_bulk('class' => LoadWorker, 'args' => arr)
end
Sidekiq.logger.error "Created #{count*iter} jobs"
Monitoring = Thread.new do
watchdog("monitor thread") do
while true
sleep 1
qsize, retries = Sidekiq.redis do |conn|
conn.pipelined do
conn.llen "queue:default"
conn.zcard "retry"
end
end.map(&:to_i)
total = qsize + retries
#GC.start
Sidekiq.logger.error("RSS: #{Process.rss} Pending: #{total}")
if total == 0
Sidekiq.logger.error("Done")
exit(0)
end
end
end
end
begin
#RubyProf::exclude_threads = [ Monitoring ]
#RubyProf.start
fire_event(:startup)
#Sidekiq.logger.error "Simulating 1ms of latency between Sidekiq and redis"
#Toxiproxy[:redis].downstream(:latency, latency: 1).apply do
launcher = Sidekiq::Launcher.new(Sidekiq.options)
launcher.run
while readable_io = IO.select([self_read])
signal = readable_io.first[0].gets.strip
handle_signal(launcher, signal)
end
#end
rescue SystemExit => e
#Sidekiq.logger.error("Profiling...")
#result = RubyProf.stop
#printer = RubyProf::GraphHtmlPrinter.new(result)
#printer.print(File.new("output.html", "w"), :min_percent => 1)
# normal
rescue => e
raise e if $DEBUG
STDERR.puts e.message
STDERR.puts e.backtrace.join("\n")
exit 1
end