Skip to content

Commit

Permalink
Re-implement Executor class with parallel gem and update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
egiurleo committed Apr 7, 2022
1 parent e39efff commit 717347d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 51 deletions.
45 changes: 3 additions & 42 deletions lib/tapioca/executor.rb
Expand Up @@ -2,6 +2,7 @@
# frozen_string_literal: true

require "etc"
require "parallel"

module Tapioca
class Executor
Expand Down Expand Up @@ -32,48 +33,8 @@ def initialize(queue, number_of_workers: nil)
).returns(T::Array[T.type_parameter(:T)])
end
def run_in_parallel(&block)
# If we only have one worker selected, it's not worth forking, just run sequentially
return @queue.map { |item| block.call(item) } if @number_of_workers == 1

read_pipes = []
write_pipes = []

# If we have more than one worker, fork the pool by shifting the expected number of items per worker from the
# queue
workers = (0...@number_of_workers).map do
items = @queue.shift(@items_per_worker)

# Each worker has their own pair of pipes, so that we can read the result from each worker separately
read, write = IO.pipe
read_pipes << read
write_pipes << write

fork do
read.close
result = items.map { |item| block.call(item) }

# Pack the result as a Base64 string of the Marshal dump of the array of values returned by the block that we
# ran in parallel
packed = [Marshal.dump(result)].pack("m")
write.puts(packed)
write.close
end
end

# Close all the write pipes, then read and close from all the read pipes
write_pipes.each(&:close)
result = read_pipes.map do |pipe|
content = pipe.read
pipe.close
content
end

# Wait until all the workers finish. Notice that waiting for the PIDs can only happen after we read and close the
# pipe or else we may end up in a condition where writing to the pipe hangs indefinitely
workers.each { |pid| Process.waitpid(pid) }

# Decode the value back into the Ruby objects by doing the inverse of what each worker does
result.flat_map { |item| T.unsafe(Marshal.load(item.unpack1("m"))) }
number_of_processes = @number_of_workers == 1 ? 0 : @number_of_workers
Parallel.map(@queue, { in_processes: number_of_processes }, &block)
end
end
end
15 changes: 6 additions & 9 deletions spec/executor_spec.rb
Expand Up @@ -11,22 +11,19 @@ class ExecutorSpec < Minitest::Spec
@executor = T.let(Executor.new(@queue), Executor)
end

it "splits the queue into a group per worker" do
received_numbers = []

@executor.run_in_parallel do |number|
received_numbers << number
it "creates one worker for every two items on the queue" do
received_pids = @executor.run_in_parallel do
Process.pid
end

assert_equal(@queue.length, received_numbers.length)
assert_equal(@queue, received_numbers.sort)
assert_equal(@queue.length / 2, received_pids.uniq.length)
end

it "runs sequentially when the number of workers is one" do
executor = Executor.new(@queue, number_of_workers: 1)
parent_pid = Process.pid

executor.run_in_parallel do |_|
executor.run_in_parallel do
assert_equal(parent_pid, Process.pid)
end
end
Expand All @@ -35,7 +32,7 @@ class ExecutorSpec < Minitest::Spec
executor = Executor.new(@queue, number_of_workers: 4)
parent_pid = Process.pid

executor.run_in_parallel do |_|
executor.run_in_parallel do
refute_equal(parent_pid, Process.pid)
end
end
Expand Down

0 comments on commit 717347d

Please sign in to comment.