From 7aad73c5756a4a361fd6b6de91ffb3029f9d50b6 Mon Sep 17 00:00:00 2001 From: Emily Giurleo Date: Wed, 6 Apr 2022 11:50:34 -0400 Subject: [PATCH] Re-implement Executor class with parallel gem and update tests --- lib/tapioca/executor.rb | 45 +++-------------------------------------- spec/executor_spec.rb | 15 ++------------ 2 files changed, 5 insertions(+), 55 deletions(-) diff --git a/lib/tapioca/executor.rb b/lib/tapioca/executor.rb index c86bfaa05..5e9c685b0 100644 --- a/lib/tapioca/executor.rb +++ b/lib/tapioca/executor.rb @@ -2,6 +2,7 @@ # frozen_string_literal: true require "etc" +require "parallel" module Tapioca class Executor @@ -28,48 +29,8 @@ def initialize(queue, number_of_workers: nil) ).returns(T::Array[T.type_parameter(:T)]) end def run_in_parallel(&block) - # If we only have one worker selected, it's not worth forking, just run sequentially - return @queue.map { |item| block.call(item) } if @number_of_workers == 1 - - read_pipes = [] - write_pipes = [] - - # If we have more than one worker, fork the pool by shifting the expected number of items per worker from the - # queue - workers = (0...@number_of_workers).map do - items = @queue.shift(@items_per_worker) - - # Each worker has their own pair of pipes, so that we can read the result from each worker separately - read, write = IO.pipe - read_pipes << read - write_pipes << write - - fork do - read.close - result = items.map { |item| block.call(item) } - - # Pack the result as a Base64 string of the Marshal dump of the array of values returned by the block that we - # ran in parallel - packed = [Marshal.dump(result)].pack("m") - write.puts(packed) - write.close - end - end - - # Close all the write pipes, then read and close from all the read pipes - write_pipes.each(&:close) - result = read_pipes.map do |pipe| - content = pipe.read - pipe.close - content - end - - # Wait until all the workers finish. Notice that waiting for the PIDs can only happen after we read and close the - # pipe or else we may end up in a condition where writing to the pipe hangs indefinitely - workers.each { |pid| Process.waitpid(pid) } - - # Decode the value back into the Ruby objects by doing the inverse of what each worker does - result.flat_map { |item| T.unsafe(Marshal.load(item.unpack1("m"))) } + number_of_processes = @number_of_workers == 1 ? 0 : @number_of_workers + Parallel.map(@queue, { in_processes: number_of_processes }, &block) end end end diff --git a/spec/executor_spec.rb b/spec/executor_spec.rb index 83bd058da..e1dc36920 100644 --- a/spec/executor_spec.rb +++ b/spec/executor_spec.rb @@ -11,22 +11,11 @@ class ExecutorSpec < Minitest::Spec @executor = T.let(Executor.new(@queue), Executor) end - it "splits the queue into a group per worker" do - received_numbers = [] - - @executor.run_in_parallel do |number| - received_numbers << number - end - - assert_equal(@queue.length, received_numbers.length) - assert_equal(@queue, received_numbers.sort) - end - it "runs sequentially when the number of workers is one" do executor = Executor.new(@queue, number_of_workers: 1) parent_pid = Process.pid - executor.run_in_parallel do |_| + executor.run_in_parallel do assert_equal(parent_pid, Process.pid) end end @@ -35,7 +24,7 @@ class ExecutorSpec < Minitest::Spec executor = Executor.new(@queue, number_of_workers: 4) parent_pid = Process.pid - executor.run_in_parallel do |_| + executor.run_in_parallel do refute_equal(parent_pid, Process.pid) end end