Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fairer queueing for parallel workers in RBI generation #889

Merged
merged 2 commits into from Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile.lock
Expand Up @@ -10,6 +10,7 @@ PATH
specs:
tapioca (0.7.0)
bundler (>= 1.17.3)
parallel (>= 1.21.0)
pry (>= 0.12.2)
rbi (~> 0.0.0, >= 0.0.14)
sorbet-static-and-runtime (>= 0.5.9204)
Expand Down
50 changes: 4 additions & 46 deletions lib/tapioca/executor.rb
Expand Up @@ -2,6 +2,7 @@
# frozen_string_literal: true

require "etc"
require "parallel"

module Tapioca
class Executor
Expand All @@ -20,10 +21,6 @@ def initialize(queue, number_of_workers: nil)
number_of_workers || [Etc.nprocessors, (queue.length.to_f / MINIMUM_ITEMS_PER_WORKER).ceil].min,
Integer
)

# The number of items that will be processed per worker, so that we can split the queue into groups and assign
# them to each one of the workers
@items_per_worker = T.let((queue.length.to_f / @number_of_workers).ceil, Integer)
end

sig do
Expand All @@ -32,48 +29,9 @@ def initialize(queue, number_of_workers: nil)
).returns(T::Array[T.type_parameter(:T)])
end
def run_in_parallel(&block)
# If we only have one worker selected, it's not worth forking, just run sequentially
return @queue.map { |item| block.call(item) } if @number_of_workers == 1

read_pipes = []
write_pipes = []

# If we have more than one worker, fork the pool by shifting the expected number of items per worker from the
# queue
workers = (0...@number_of_workers).map do
items = @queue.shift(@items_per_worker)

# Each worker has their own pair of pipes, so that we can read the result from each worker separately
read, write = IO.pipe
read_pipes << read
write_pipes << write

fork do
read.close
result = items.map { |item| block.call(item) }

# Pack the result as a Base64 string of the Marshal dump of the array of values returned by the block that we
# ran in parallel
packed = [Marshal.dump(result)].pack("m")
write.puts(packed)
write.close
end
end

# Close all the write pipes, then read and close from all the read pipes
write_pipes.each(&:close)
result = read_pipes.map do |pipe|
content = pipe.read
pipe.close
content
end

# Wait until all the workers finish. Notice that waiting for the PIDs can only happen after we read and close the
# pipe or else we may end up in a condition where writing to the pipe hangs indefinitely
workers.each { |pid| Process.waitpid(pid) }

# Decode the value back into the Ruby objects by doing the inverse of what each worker does
result.flat_map { |item| T.unsafe(Marshal.load(item.unpack1("m"))) }
# To have the parallel gem run jobs in the parent process, you must pass 0 as the number of processes
number_of_processes = @number_of_workers == 1 ? 0 : @number_of_workers
Morriar marked this conversation as resolved.
Show resolved Hide resolved
Parallel.map(@queue, { in_processes: number_of_processes }, &block)
end
end
end
15 changes: 2 additions & 13 deletions spec/executor_spec.rb
Expand Up @@ -11,22 +11,11 @@ class ExecutorSpec < Minitest::Spec
@executor = T.let(Executor.new(@queue), Executor)
end

it "splits the queue into a group per worker" do
received_numbers = []

@executor.run_in_parallel do |number|
received_numbers << number
end

assert_equal(@queue.length, received_numbers.length)
assert_equal(@queue, received_numbers.sort)
end

it "runs sequentially when the number of workers is one" do
executor = Executor.new(@queue, number_of_workers: 1)
parent_pid = Process.pid

executor.run_in_parallel do |_|
executor.run_in_parallel do
assert_equal(parent_pid, Process.pid)
end
end
Expand All @@ -35,7 +24,7 @@ class ExecutorSpec < Minitest::Spec
executor = Executor.new(@queue, number_of_workers: 4)
parent_pid = Process.pid

executor.run_in_parallel do |_|
executor.run_in_parallel do
refute_equal(parent_pid, Process.pid)
end
end
Expand Down
1 change: 1 addition & 0 deletions tapioca.gemspec
Expand Up @@ -24,6 +24,7 @@ Gem::Specification.new do |spec|
spec.metadata["allowed_push_host"] = "https://rubygems.org"

spec.add_dependency("bundler", ">= 1.17.3")
spec.add_dependency("parallel", ">= 1.21.0")
spec.add_dependency("pry", ">= 0.12.2")
spec.add_dependency("rbi", "~> 0.0.0", ">= 0.0.14")
spec.add_dependency("sorbet-static-and-runtime", ">= 0.5.9204")
Expand Down