/
worker.rb
234 lines (186 loc) · 7.42 KB
/
worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
require "json"
require "pathname"
require "pp"
module RSpecQ
class Worker
HEARTBEAT_FREQUENCY = WORKER_LIVENESS_SEC / 6
# The root path or individual spec files to execute.
#
# Defaults to "spec" (just like in RSpec)
attr_accessor :files_or_dirs_to_run
# If true, job timings will be populated in the global Redis timings key
#
# Defaults to false
attr_accessor :populate_timings
# If set, spec files that are known to take more than this value to finish,
# will be split and scheduled on a per-example basis.
#
# Defaults to 999999
attr_accessor :file_split_threshold
# Retry failed examples up to N times (with N being the supplied value)
# before considering them legit failures
#
# Defaults to 3
attr_accessor :max_requeues
attr_reader :queue
def initialize(build_id:, worker_id:, redis_host:)
@build_id = build_id
@worker_id = worker_id
@queue = Queue.new(build_id, worker_id, redis_host)
@files_or_dirs_to_run = "spec"
@populate_timings = false
@file_split_threshold = 999999
@heartbeat_updated_at = nil
@max_requeues = 3
RSpec::Core::Formatters.register(Formatters::JobTimingRecorder, :dump_summary)
RSpec::Core::Formatters.register(Formatters::ExampleCountRecorder, :dump_summary)
RSpec::Core::Formatters.register(Formatters::FailureRecorder, :example_failed, :message)
RSpec::Core::Formatters.register(Formatters::WorkerHeartbeatRecorder, :example_finished)
end
def work
puts "Working for build #{@build_id} (worker=#{@worker_id})"
try_publish_queue!(queue)
queue.wait_until_published
loop do
# we have to bootstrap this so that it can be used in the first call
# to `requeue_lost_job` inside the work loop
update_heartbeat
lost = queue.requeue_lost_job
puts "Requeued lost job: #{lost}" if lost
# TODO: can we make `reserve_job` also act like exhausted? and get
# rid of `exhausted?` (i.e. return false if no jobs remain)
job = queue.reserve_job
# build is finished
return if job.nil? && queue.exhausted?
next if job.nil?
puts
puts "Executing #{job}"
reset_rspec_state!
# reconfigure rspec
RSpec.configuration.detail_color = :magenta
RSpec.configuration.seed = srand && srand % 0xFFFF
RSpec.configuration.backtrace_formatter.filter_gem('rspecq')
RSpec.configuration.add_formatter(Formatters::FailureRecorder.new(queue, job, max_requeues))
RSpec.configuration.add_formatter(Formatters::ExampleCountRecorder.new(queue))
RSpec.configuration.add_formatter(Formatters::WorkerHeartbeatRecorder.new(self))
if populate_timings
RSpec.configuration.add_formatter(Formatters::JobTimingRecorder.new(queue, job))
end
opts = RSpec::Core::ConfigurationOptions.new(["--format", "progress", job])
_result = RSpec::Core::Runner.new(opts).run($stderr, $stdout)
queue.acknowledge_job(job)
end
end
# Update the worker heartbeat if necessary
def update_heartbeat
if @heartbeat_updated_at.nil? || elapsed(@heartbeat_updated_at) >= HEARTBEAT_FREQUENCY
queue.record_worker_heartbeat
@heartbeat_updated_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
def try_publish_queue!(queue)
return if !queue.become_master
RSpec.configuration.files_or_directories_to_run = files_or_dirs_to_run
files_to_run = RSpec.configuration.files_to_run.map { |j| relative_path(j) }
timings = queue.timings
if timings.empty?
q_size = queue.publish(files_to_run.shuffle)
log_event(
"No timings found! Published queue in random order (size=#{q_size})",
"warning"
)
return
end
# prepare jobs to run
jobs = []
slow_files = []
if file_split_threshold
slow_files = timings.take_while do |_job, duration|
duration >= file_split_threshold
end.map(&:first) & files_to_run
end
if slow_files.any?
jobs.concat(files_to_run - slow_files)
jobs.concat(files_to_example_ids(slow_files))
else
jobs.concat(files_to_run)
end
default_timing = timings.values[timings.values.size/2]
# assign timings (based on previous runs) to all jobs
jobs = jobs.each_with_object({}) do |j, h|
puts "Untimed job: #{j}" if timings[j].nil?
# HEURISTIC: put jobs without previous timings (e.g. a newly added
# spec file) in the middle of the queue
h[j] = timings[j] || default_timing
end
# sort jobs based on their timings (slowest to be processed first)
jobs = jobs.sort_by { |_j, t| -t }.map(&:first)
puts "Published queue (size=#{queue.publish(jobs)})"
end
private
def reset_rspec_state!
RSpec.clear_examples
# see https://github.com/rspec/rspec-core/pull/2723
if Gem::Version.new(RSpec::Core::Version::STRING) <= Gem::Version.new("3.9.1")
RSpec.world.instance_variable_set(
:@example_group_counts_by_spec_file, Hash.new(0))
end
# RSpec.clear_examples does not reset those, which causes issues when
# a non-example error occurs (subsequent jobs are not executed)
# TODO: upstream
RSpec.world.non_example_failure = false
# we don't want an error that occured outside of the examples (which
# would set this to `true`) to stop the worker
RSpec.world.wants_to_quit = false
end
# NOTE: RSpec has to load the files before we can split them as individual
# examples. In case a file to be splitted fails to be loaded
# (e.g. contains a syntax error), we return the files unchanged, thereby
# falling back to scheduling them as whole files. Their errors will be
# reported in the normal flow when they're eventually picked up by a worker.
def files_to_example_ids(files)
cmd = "DISABLE_SPRING=1 bundle exec rspec --dry-run --format json #{files.join(' ')} 2>&1"
out = `#{cmd}`
cmd_result = $?
if !cmd_result.success?
rspec_output = begin
JSON.parse(out)
rescue JSON::ParserError
out
end
log_event(
"Failed to split slow files, falling back to regular scheduling",
"error",
rspec_output: rspec_output,
cmd_result: cmd_result.inspect,
)
pp rspec_output
return files
end
JSON.parse(out)["examples"].map { |e| e["id"] }
end
def relative_path(job)
@cwd ||= Pathname.new(Dir.pwd)
"./#{Pathname.new(job).relative_path_from(@cwd)}"
end
def elapsed(since)
Process.clock_gettime(Process::CLOCK_MONOTONIC) - since
end
# Prints msg to standard output and emits an event to Sentry, if the
# SENTRY_DSN environment variable is set.
def log_event(msg, level, additional={})
puts msg
Raven.capture_message(msg, level: level, extra: {
build: @build_id,
worker: @worker_id,
queue: queue.inspect,
files_or_dirs_to_run: files_or_dirs_to_run,
populate_timings: populate_timings,
file_split_threshold: file_split_threshold,
heartbeat_updated_at: @heartbeat_updated_at,
object: self.inspect,
pid: Process.pid,
}.merge(additional))
end
end
end