Skip to content

Commit

Permalink
Merge pull request #889 from Shopify/fairer-queueing
Browse files Browse the repository at this point in the history
Fairer queueing for parallel workers in RBI generation
  • Loading branch information
Emily Giurleo committed Apr 8, 2022
2 parents cdaf17a + d41c995 commit 9a1cb24
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 59 deletions.
1 change: 1 addition & 0 deletions Gemfile.lock
Expand Up @@ -10,6 +10,7 @@ PATH
specs:
tapioca (0.7.0)
bundler (>= 1.17.3)
parallel (>= 1.21.0)
pry (>= 0.12.2)
rbi (~> 0.0.0, >= 0.0.14)
sorbet-static-and-runtime (>= 0.5.9204)
Expand Down
50 changes: 4 additions & 46 deletions lib/tapioca/executor.rb
Expand Up @@ -2,6 +2,7 @@
# frozen_string_literal: true

require "etc"
require "parallel"

module Tapioca
class Executor
Expand All @@ -20,10 +21,6 @@ def initialize(queue, number_of_workers: nil)
number_of_workers || [Etc.nprocessors, (queue.length.to_f / MINIMUM_ITEMS_PER_WORKER).ceil].min,
Integer
)

# The number of items that will be processed per worker, so that we can split the queue into groups and assign
# them to each one of the workers
@items_per_worker = T.let((queue.length.to_f / @number_of_workers).ceil, Integer)
end

sig do
Expand All @@ -32,48 +29,9 @@ def initialize(queue, number_of_workers: nil)
).returns(T::Array[T.type_parameter(:T)])
end
def run_in_parallel(&block)
# If we only have one worker selected, it's not worth forking, just run sequentially
return @queue.map { |item| block.call(item) } if @number_of_workers == 1

read_pipes = []
write_pipes = []

# If we have more than one worker, fork the pool by shifting the expected number of items per worker from the
# queue
workers = (0...@number_of_workers).map do
items = @queue.shift(@items_per_worker)

# Each worker has their own pair of pipes, so that we can read the result from each worker separately
read, write = IO.pipe
read_pipes << read
write_pipes << write

fork do
read.close
result = items.map { |item| block.call(item) }

# Pack the result as a Base64 string of the Marshal dump of the array of values returned by the block that we
# ran in parallel
packed = [Marshal.dump(result)].pack("m")
write.puts(packed)
write.close
end
end

# Close all the write pipes, then read and close from all the read pipes
write_pipes.each(&:close)
result = read_pipes.map do |pipe|
content = pipe.read
pipe.close
content
end

# Wait until all the workers finish. Notice that waiting for the PIDs can only happen after we read and close the
# pipe or else we may end up in a condition where writing to the pipe hangs indefinitely
workers.each { |pid| Process.waitpid(pid) }

# Decode the value back into the Ruby objects by doing the inverse of what each worker does
result.flat_map { |item| T.unsafe(Marshal.load(item.unpack1("m"))) }
# To have the parallel gem run jobs in the parent process, you must pass 0 as the number of processes
number_of_processes = @number_of_workers == 1 ? 0 : @number_of_workers
Parallel.map(@queue, { in_processes: number_of_processes }, &block)
end
end
end
15 changes: 2 additions & 13 deletions spec/executor_spec.rb
Expand Up @@ -11,22 +11,11 @@ class ExecutorSpec < Minitest::Spec
@executor = T.let(Executor.new(@queue), Executor)
end

it "splits the queue into a group per worker" do
received_numbers = []

@executor.run_in_parallel do |number|
received_numbers << number
end

assert_equal(@queue.length, received_numbers.length)
assert_equal(@queue, received_numbers.sort)
end

it "runs sequentially when the number of workers is one" do
executor = Executor.new(@queue, number_of_workers: 1)
parent_pid = Process.pid

executor.run_in_parallel do |_|
executor.run_in_parallel do
assert_equal(parent_pid, Process.pid)
end
end
Expand All @@ -35,7 +24,7 @@ class ExecutorSpec < Minitest::Spec
executor = Executor.new(@queue, number_of_workers: 4)
parent_pid = Process.pid

executor.run_in_parallel do |_|
executor.run_in_parallel do
refute_equal(parent_pid, Process.pid)
end
end
Expand Down
1 change: 1 addition & 0 deletions tapioca.gemspec
Expand Up @@ -24,6 +24,7 @@ Gem::Specification.new do |spec|
spec.metadata["allowed_push_host"] = "https://rubygems.org"

spec.add_dependency("bundler", ">= 1.17.3")
spec.add_dependency("parallel", ">= 1.21.0")
spec.add_dependency("pry", ">= 0.12.2")
spec.add_dependency("rbi", "~> 0.0.0", ">= 0.0.14")
spec.add_dependency("sorbet-static-and-runtime", ">= 0.5.9204")
Expand Down

0 comments on commit 9a1cb24

Please sign in to comment.