-
Notifications
You must be signed in to change notification settings - Fork 112
/
executor.rb
75 lines (61 loc) · 2.62 KB
/
executor.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# typed: strict
# frozen_string_literal: true
require "etc"
module Tapioca
class Executor
extend T::Sig
MINIMUM_ITEMS_PER_WORKER = T.let(2, Integer)
sig { params(queue: T::Array[T.untyped], number_of_workers: T.nilable(Integer)).void }
def initialize(queue, number_of_workers: nil)
@queue = queue
# Forking workers is expensive and not worth it for a low number of gems. Here we assign the number of workers to
# be the minimum between the number of available processors (max) or the number of workers to make sure that each
# one has at least 4 items to process
@number_of_workers = T.let(
number_of_workers || [Etc.nprocessors, (queue.length.to_f / MINIMUM_ITEMS_PER_WORKER).ceil].min,
Integer
)
end
sig do
type_parameters(:T).params(
block: T.proc.params(item: T.untyped).returns(T.type_parameter(:T))
).returns(T::Array[T.type_parameter(:T)])
end
def run_in_parallel(&block)
# If we only have one worker selected, it's not worth forking, just run sequentially
return @queue.map { |item| block.call(item) } if @number_of_workers == 1
read_pipes = []
write_pipes = []
# If we have more than one worker, fork the pool by shifting the expected number of items per worker from the
# queue
workers = (0...@number_of_workers).map do
items = @queue.shift(@items_per_worker)
# Each worker has their own pair of pipes, so that we can read the result from each worker separately
read, write = IO.pipe
read_pipes << read
write_pipes << write
fork do
read.close
result = items.map { |item| block.call(item) }
# Pack the result as a Base64 string of the Marshal dump of the array of values returned by the block that we
# ran in parallel
packed = [Marshal.dump(result)].pack("m")
write.puts(packed)
write.close
end
end
# Close all the write pipes, then read and close from all the read pipes
write_pipes.each(&:close)
result = read_pipes.map do |pipe|
content = pipe.read
pipe.close
content
end
# Wait until all the workers finish. Notice that waiting for the PIDs can only happen after we read and close the
# pipe or else we may end up in a condition where writing to the pipe hangs indefinitely
workers.each { |pid| Process.waitpid(pid) }
# Decode the value back into the Ruby objects by doing the inverse of what each worker does
result.flat_map { |item| T.unsafe(Marshal.load(item.unpack1("m"))) }
end
end
end