-
Notifications
You must be signed in to change notification settings - Fork 966
/
parallel_manager.rb
71 lines (59 loc) · 1.69 KB
/
parallel_manager.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# frozen_string_literal: true
module Faraday
class Adapter
class EMSynchrony < Faraday::Adapter
# A parallel manager for EMSynchrony.
class ParallelManager
# Add requests to queue.
#
# @param request [EM::HttpRequest]
# @param method [Symbol, String] HTTP method
# @param args [Array] the rest of the positional arguments
def add(request, method, *args, &block)
queue << {
request: request,
method: method,
args: args,
block: block
}
end
# Run all requests on queue with `EM::Synchrony::Multi`, wrapping
# it in a reactor and fiber if needed.
def run
result = nil
if !EM.reactor_running?
EM.run {
Fiber.new do
result = perform
EM.stop
end.resume
}
else
result = perform
end
result
end
private
# The request queue.
def queue
@queue ||= []
end
# Main `EM::Synchrony::Multi` performer.
def perform
multi = ::EM::Synchrony::Multi.new
queue.each do |item|
method = "a#{item[:method]}".to_sym
req = item[:request].send(method, *item[:args])
req.callback(&item[:block])
req_name = "req_#{multi.requests.size}".to_sym
multi.add(req_name, req)
end
# Clear the queue, so parallel manager objects can be reused.
@queue = []
# Block fiber until all requests have returned.
multi.perform
end
end
end
end
end