forked from Dynflow/dynflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_helper.rb
442 lines (375 loc) · 12.9 KB
/
test_helper.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
# frozen_string_literal: true
require 'bundler/setup'
require 'minitest/reporters'
require 'minitest/autorun'
require 'minitest/spec'
ENV['MT_NO_PLUGINS'] = 'true'
MiniTest::Reporters.use! if ENV['RM_INFO']
load_path = File.expand_path(File.dirname(__FILE__))
$LOAD_PATH << load_path unless $LOAD_PATH.include? load_path
require 'dynflow'
require 'dynflow/testing'
begin require 'pry'; rescue LoadError; nil end
require 'support/code_workflow_example'
require 'support/middleware_example'
require 'support/rescue_example'
require 'support/dummy_example'
require 'support/test_execution_log'
# To be able to stop a process in some step and perform assertions while paused
class TestPause
def self.setup
@pause = Concurrent::Promises.resolvable_future
@ready = Concurrent::Promises.resolvable_future
end
def self.teardown
@pause = nil
@ready = nil
end
# to be called from action
def self.pause
if !@pause
raise 'the TestPause class was not setup'
elsif @ready.resolved?
raise 'you can pause only once'
else
@ready.fulfill(true)
@pause.wait
end
end
# in the block perform assertions
def self.when_paused
if @pause
@ready.wait # wait till we are paused
yield
@pause.fulfill(true) # resume the run
else
raise 'the TestPause class was not setup'
end
end
end
class CoordiationAdapterWithLog < Dynflow::CoordinatorAdapters::Sequel
attr_reader :lock_log
def initialize(*args)
@lock_log = []
super
end
def create_record(record)
@lock_log << "lock #{record.id}" if record.is_a? Dynflow::Coordinator::Lock
super
end
def delete_record(record)
@lock_log << "unlock #{record.id}" if record.is_a? Dynflow::Coordinator::Lock
super
end
end
module WorldFactory
def self.created_worlds
@created_worlds ||= []
end
def self.test_world_config
config = Dynflow::Config.new
config.persistence_adapter = persistence_adapter
config.logger_adapter = logger_adapter
config.coordinator_adapter = coordinator_adapter
config.delayed_executor = nil
config.auto_rescue = false
config.auto_validity_check = false
config.exit_on_terminate = false
config.auto_execute = false
config.auto_terminate = false
config.backup_deleted_plans = false
config.backup_dir = nil
config.queues.add(:slow, :pool_size => 1)
yield config if block_given?
return config
end
# The worlds created by this method are getting terminated after each test run
def self.create_world(klass = Dynflow::World, &block)
klass.new(test_world_config(&block)).tap do |world|
created_worlds << world
end
end
# This world survives though the whole run of the test suite: careful with it, it can
# introduce unnecessary test dependencies
def self.logger_adapter
@adapter ||= Dynflow::LoggerAdapters::Simple.new $stderr, ::Logger::FATAL
end
def self.persistence_adapter
@persistence_adapter ||= begin
db_config = ENV['DB_CONN_STRING'] || 'sqlite:/'
puts "Using database configuration: #{db_config}"
Dynflow::PersistenceAdapters::Sequel.new(db_config)
end
end
def self.coordinator_adapter
->(world, _) { CoordiationAdapterWithLog.new(world) }
end
def self.clean_coordinator_records
persistence_adapter = WorldFactory.persistence_adapter
persistence_adapter.find_coordinator_records({}).each do |w|
warn "Unexpected coordinator record: #{ w }"
persistence_adapter.delete_coordinator_record(w[:class], w[:id])
end
end
def self.terminate_worlds
created_worlds.map(&:terminate).map(&:wait)
created_worlds.clear
end
end
module TestHelpers
# allows to create the world inside the tests, using the `connector`
# and `persistence adapter` from the test context: usefull to create
# multi-world topology for a signle test
def create_world(with_executor = true, &block)
WorldFactory.create_world do |config|
config.connector = connector
config.persistence_adapter = persistence_adapter
unless with_executor
config.executor = false
end
block.call(config) if block
end
end
def connector_polling_interval(world)
if world.persistence.adapter.db.class.name == "Sequel::Postgres::Database"
5
else
0.005
end
end
# get director for deeper investigation of the current execution state
def get_director(world)
core_context = world.executor.instance_variable_get('@core').instance_variable_get('@core').context
core_context.instance_variable_get('@director')
end
# waits for the passed block to return non-nil value and reiterates it while getting false
# (till some reasonable timeout). Useful for forcing the tests for some event to occur
def wait_for(waiting_message = 'something to happen')
30.times do
ret = yield
return ret if ret
sleep 0.3
end
assert false, "waiting for #{waiting_message} was not successful"
end
def executor_id_for_plan(execution_plan_id)
if lock = client_world.coordinator.find_locks(class: Dynflow::Coordinator::ExecutionLock.name,
id: "execution-plan:#{execution_plan_id}").first
lock.world_id
end
end
def trigger_waiting_action
triggered = client_world.trigger(Support::DummyExample::DeprecatedEventedAction)
wait_for { executor_id_for_plan(triggered.id) } # waiting for the plan to be picked by an executor
triggered
end
# trigger an action, and keep it running while yielding the block
def while_executing_plan
triggered = trigger_waiting_action
executor_id = wait_for do
executor_id_for_plan(triggered.id)
end
plan = client_world.persistence.load_execution_plan(triggered.id)
step = plan.steps.values.last
wait_for do
client_world.persistence.load_step(step.execution_plan_id, step.id, client_world).state == :suspended
end
executor = WorldFactory.created_worlds.find { |e| e.id == executor_id }
raise "Could not find an executor with id #{executor_id}" unless executor
yield executor
return triggered
end
# finish the plan triggered by the `while_executing_plan` method
def finish_the_plan(triggered)
wait_for do
client_world.persistence.load_execution_plan(triggered.id).state == :running &&
client_world.persistence.load_step(triggered.id, 2, client_world).state == :suspended
end
client_world.event(triggered.id, 2, 'finish')
return triggered.finished.value
end
def assert_plan_reexecuted(plan)
assert_equal :stopped, plan.state
assert_equal :success, plan.result
assert_equal ['start execution',
'terminate execution',
'start execution',
'finish execution'],
plan.execution_history.map(&:name)
refute_equal plan.execution_history.first.world_id, plan.execution_history.to_a.last.world_id
end
end
class MiniTest::Test
def setup
WorldFactory.clean_coordinator_records
end
def teardown
WorldFactory.terminate_worlds
end
end
# ensure there are no unresolved events at the end or being GCed
events_test = -> do
event_creations = {}
non_ready_events = {}
Concurrent::Promises::Event.singleton_class.send :define_method, :new do |*args, &block|
super(*args, &block).tap do |event|
event_creations[event.object_id] = caller(4)
end
end
[Concurrent::Promises::Event, Concurrent::Promises::ResolvableFuture].each do |future_class|
original_resolved_method = future_class.instance_method :resolve_with
future_class.send :define_method, :resolve_with do |*args|
begin
original_resolved_method.bind(self).call(*args)
ensure
event_creations.delete(self.object_id)
end
end
end
MiniTest.after_run do
Concurrent::Actor.root.ask!(:terminate!)
non_ready_events = ObjectSpace.each_object(Concurrent::Promises::Event).map do |event|
event.wait(1)
unless event.resolved?
event.object_id
end
end.compact
# make sure to include the ids that were garbage-collected already
non_ready_events = (non_ready_events + event_creations.keys).uniq
unless non_ready_events.empty?
unified = non_ready_events.each_with_object({}) do |(id, _), h|
backtrace_key = event_creations[id].hash
h[backtrace_key] ||= []
h[backtrace_key] << id
end
raise("there were #{non_ready_events.size} non_ready_events:\n" +
unified.map do |_, ids|
"--- #{ids.size}: #{ids}\n#{event_creations[ids.first].join("\n")}"
end.join("\n"))
end
end
# time out all futures by default
default_timeout = 8
wait_method = Concurrent::Promises::AbstractEventFuture.instance_method(:wait)
Concurrent::Promises::AbstractEventFuture.class_eval do
define_method :wait do |timeout = nil|
wait_method.bind(self).call(timeout || default_timeout)
end
end
end
events_test.call
class ConcurrentRunTester
def initialize
@enter_future, @exit_future = Concurrent::Promises.resolvable_future, Concurrent::Promises.resolvable_future
end
def while_executing(&block)
@thread = Thread.new do
block.call(self)
end
@enter_future.wait(1)
end
def pause
@enter_future.fulfill(true)
@exit_future.wait(1)
end
def finish
@exit_future.fulfill(true)
@thread.join
end
end
module PlanAssertions
def inspect_flow(execution_plan, flow)
out = "".dup
inspect_subflow(out, execution_plan, flow, "".dup)
out
end
def inspect_plan_steps(execution_plan)
out = "".dup
inspect_plan_step(out, execution_plan, execution_plan.root_plan_step, "".dup)
out
end
def assert_planning_success(execution_plan)
execution_plan.plan_steps.all? { |plan_step| _(plan_step.state).must_equal :success, plan_step.error }
end
def assert_run_flow(expected, execution_plan)
assert_planning_success(execution_plan)
_(inspect_flow(execution_plan, execution_plan.run_flow).chomp).must_equal dedent(expected).chomp
end
def assert_finalize_flow(expected, execution_plan)
assert_planning_success(execution_plan)
_(inspect_flow(execution_plan, execution_plan.finalize_flow).chomp).must_equal dedent(expected).chomp
end
def assert_run_flow_equal(expected_plan, execution_plan)
expected = inspect_flow(expected_plan, expected_plan.run_flow)
current = inspect_flow(execution_plan, execution_plan.run_flow)
assert_equal expected, current
end
def assert_steps_equal(expected, current)
_(current.id).must_equal expected.id
_(current.class).must_equal expected.class
_(current.state).must_equal expected.state
_(current.action_class).must_equal expected.action_class
_(current.action_id).must_equal expected.action_id
if expected.respond_to?(:children)
_(current.children).must_equal(expected.children)
end
end
def assert_plan_steps(expected, execution_plan)
_(inspect_plan_steps(execution_plan).chomp).must_equal dedent(expected).chomp
end
def assert_finalized(action_class, input)
assert_executed(:finalize, action_class, input)
end
def assert_executed(phase, action_class, input)
log = TestExecutionLog.send(phase).log
found_log = log.any? do |(logged_action_class, logged_input)|
action_class == logged_action_class && input == logged_input
end
unless found_log
message = ["#{action_class} with input #{input.inspect} not executed in #{phase} phase"]
message << "following actions were executed:"
log.each do |(logged_action_class, logged_input)|
message << "#{logged_action_class} #{logged_input.inspect}"
end
raise message.join("\n")
end
end
def inspect_subflow(out, execution_plan, flow, prefix)
case flow
when Dynflow::Flows::Atom
out << prefix
out << flow.step_id.to_s << ': '
step = execution_plan.steps[flow.step_id]
out << step.action_class.to_s[/\w+\Z/]
out << "(#{step.state})"
out << ' '
action = execution_plan.world.persistence.load_action(step)
out << action.input.inspect
unless step.state == :pending
out << ' --> '
out << action.output.inspect
end
out << "\n"
else
out << prefix << flow.class.name << "\n"
flow.sub_flows.each do |sub_flow|
inspect_subflow(out, execution_plan, sub_flow, prefix + ' ')
end
end
out
end
def inspect_plan_step(out, execution_plan, plan_step, prefix)
out << prefix
out << plan_step.action_class.to_s[/\w+\Z/]
out << "\n"
plan_step.children.each do |sub_step_id|
sub_step = execution_plan.steps[sub_step_id]
inspect_plan_step(out, execution_plan, sub_step, prefix + ' ')
end
out
end
def dedent(string)
dedent = string.scan(/^ */).map { |spaces| spaces.size }.min
string.lines.map { |line| line[dedent..-1] }.join
end
end