Skip to content

Commit

Permalink
Re-implement Executor class with parallel gem and update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
egiurleo committed Apr 8, 2022
1 parent 5a80e9b commit d41c995
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 55 deletions.
46 changes: 4 additions & 42 deletions lib/tapioca/executor.rb
Expand Up @@ -2,6 +2,7 @@
# frozen_string_literal: true

require "etc"
require "parallel"

module Tapioca
class Executor
Expand All @@ -28,48 +29,9 @@ def initialize(queue, number_of_workers: nil)
).returns(T::Array[T.type_parameter(:T)])
end
def run_in_parallel(&block)
# If we only have one worker selected, it's not worth forking, just run sequentially
return @queue.map { |item| block.call(item) } if @number_of_workers == 1

read_pipes = []
write_pipes = []

# If we have more than one worker, fork the pool by shifting the expected number of items per worker from the
# queue
workers = (0...@number_of_workers).map do
items = @queue.shift(@items_per_worker)

# Each worker has their own pair of pipes, so that we can read the result from each worker separately
read, write = IO.pipe
read_pipes << read
write_pipes << write

fork do
read.close
result = items.map { |item| block.call(item) }

# Pack the result as a Base64 string of the Marshal dump of the array of values returned by the block that we
# ran in parallel
packed = [Marshal.dump(result)].pack("m")
write.puts(packed)
write.close
end
end

# Close all the write pipes, then read and close from all the read pipes
write_pipes.each(&:close)
result = read_pipes.map do |pipe|
content = pipe.read
pipe.close
content
end

# Wait until all the workers finish. Notice that waiting for the PIDs can only happen after we read and close the
# pipe or else we may end up in a condition where writing to the pipe hangs indefinitely
workers.each { |pid| Process.waitpid(pid) }

# Decode the value back into the Ruby objects by doing the inverse of what each worker does
result.flat_map { |item| T.unsafe(Marshal.load(item.unpack1("m"))) }
# To have the parallel gem run jobs in the parent process, you must pass 0 as the number of processes
number_of_processes = @number_of_workers == 1 ? 0 : @number_of_workers
Parallel.map(@queue, { in_processes: number_of_processes }, &block)
end
end
end
15 changes: 2 additions & 13 deletions spec/executor_spec.rb
Expand Up @@ -11,22 +11,11 @@ class ExecutorSpec < Minitest::Spec
@executor = T.let(Executor.new(@queue), Executor)
end

it "splits the queue into a group per worker" do
received_numbers = []

@executor.run_in_parallel do |number|
received_numbers << number
end

assert_equal(@queue.length, received_numbers.length)
assert_equal(@queue, received_numbers.sort)
end

it "runs sequentially when the number of workers is one" do
executor = Executor.new(@queue, number_of_workers: 1)
parent_pid = Process.pid

executor.run_in_parallel do |_|
executor.run_in_parallel do
assert_equal(parent_pid, Process.pid)
end
end
Expand All @@ -35,7 +24,7 @@ class ExecutorSpec < Minitest::Spec
executor = Executor.new(@queue, number_of_workers: 4)
parent_pid = Process.pid

executor.run_in_parallel do |_|
executor.run_in_parallel do
refute_equal(parent_pid, Process.pid)
end
end
Expand Down

0 comments on commit d41c995

Please sign in to comment.