From e220eb991e03ab124d781abcf4d44a225bf59687 Mon Sep 17 00:00:00 2001 From: Michael Grosser Date: Wed, 13 Dec 2023 18:31:24 -0800 Subject: [PATCH] polish --- Readme.md | 7 +++---- lib/parallel.rb | 36 +++++++++++++++++------------------ spec/cases/finish_in_order.rb | 7 ++++--- spec/parallel_spec.rb | 18 ++++++++++++------ 4 files changed, 37 insertions(+), 31 deletions(-) diff --git a/Readme.md b/Readme.md index 5ceb2ef..76feb52 100644 --- a/Readme.md +++ b/Readme.md @@ -150,22 +150,21 @@ Parallel.map(1..50, progress: "Doing stuff") { sleep 1 } Use `:finish` or `:start` hook to get progress information. - `:start` has item and index - - `:finish` has item, index, result + - `:finish` has item, index, and result They are called on the main process and protected with a mutex. +(To just get the index, use the more performant `Parallel.each_with_index`) ```Ruby Parallel.map(1..100, finish: -> (item, i, result) { ... do something ... }) { sleep 1 } ``` -Set `finish_in_order: true` to ensure the `:finish` hooks get called in the order of the items. +Set `finish_in_order: true` to call the `:finish` hook in the order of the input (will take longer to see initial output). ```Ruby Parallel.map(1..9, finish: -> (item, i, result) { puts "#{item} ok" }, finish_in_order: true) { sleep rand } ``` -_NOTE: If all you are trying to do is get the index, it is much more performant to use `each_with_index` instead._ - ### Worker number Use `Parallel.worker_number` to determine the worker slot in which your diff --git a/lib/parallel.rb b/lib/parallel.rb index ece6c09..ee83a07 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -640,30 +640,30 @@ def with_instrumentation(item, index, options) end def instrument_finish(item, index, result, options) - return unless on_finish = options[:finish] + return unless (on_finish = options[:finish]) return instrument_finish_in_order(item, index, result, options) if options[:finish_in_order] options[:mutex].synchronize { on_finish.call(item, index, result) } end + # yield results in the order of the input items + # needs to use `options` to store state between executions + # needs to use `done` index since a nil result would also be valid def instrument_finish_in_order(item, index, result, options) options[:mutex].synchronize do - options[:finish_items] ||= [] - options[:finish_items_done] ||= [] - options[:finish_index] ||= 0 - if index == options[:finish_index] - # call finish for current item and any ready items - options[:finish].call(item, index, result) - options[:finish_index] += 1 - (index + 1).upto(options[:finish_items].size).each do |old_index| - break unless options[:finish_items_done][old_index] - old_item = options[:finish_items][old_index] - options[:finish].call(old_item, old_index, result) - options[:finish_index] += 1 - end - else - # store for later - options[:finish_items][index] = item - options[:finish_items_done][index] = true + # initialize our state + options[:finish_done] ||= [] + options[:finish_expecting] ||= 0 # we wait for item at index 0 + + # store current result + options[:finish_done][index] = [item, result] + + # yield all results that are now in order + break unless index == options[:finish_expecting] + index.upto(options[:finish_done].size).each do |i| + break unless (done = options[:finish_done][i]) + options[:finish_done][i] = nil # allow GC to free this item and result + options[:finish].call(done[0], i, done[1]) + options[:finish_expecting] += 1 end end end diff --git a/spec/cases/finish_in_order.rb b/spec/cases/finish_in_order.rb index 7ed418d..b8c0339 100644 --- a/spec/cases/finish_in_order.rb +++ b/spec/cases/finish_in_order.rb @@ -3,8 +3,9 @@ require './spec/cases/helper' class Callback - def self.call(_item) + def self.call(item) sleep rand * 0.01 + item.is_a?(Numeric) ? "F#{item}" : item end end @@ -12,8 +13,8 @@ def self.call(_item) in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym $stdout.sync = true -items = 1..9 -finish = ->(item, _index, _result) { puts "finish #{item}" } +items = [nil, false, 2, 3, 4] +finish = ->(item, index, result) { puts "finish #{item.inspect} #{index} #{result.inspect}" } options = { in_worker_type => 4, finish: finish, finish_in_order: true } if in_worker_type == :in_ractors Parallel.public_send(method, items, options.merge(ractor: [Callback, :call])) diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index 400283e..23093d5 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -341,12 +341,23 @@ def cpus Parallel.worker_number.should be_nil end - it "can run with 0" do + it "can run with 0 by not using #{type}" do Thread.should_not_receive(:exclusive) Process.should_not_receive(:fork) result = Parallel.map([1, 2, 3, 4, 5, 6, 7, 8, 9], "in_#{type}".to_sym => 0) { |x| x + 2 } result.should == [3, 4, 5, 6, 7, 8, 9, 10, 11] end + + it "can call finish hook in order #{type}" do + out = `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/finish_in_order.rb 2>&1` + without_ractor_warning(out).should == <<~OUT + finish nil 0 nil + finish false 1 false + finish 2 2 "F2" + finish 3 3 "F3" + finish 4 4 "F4" + OUT + end end it "notifies when an item of work is dispatched to a worker process" do @@ -653,11 +664,6 @@ def cpus end end - it "calls finish hook with finish_in_order: true" do - out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/finish_in_order.rb 2>&1` - without_ractor_warning(out).should == (1..9).map { |item| "finish #{item}\n" }.join - end - it "sets Parallel.worker_number with #{type}" do skip "unsupported" if type == "ractors" out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_worker_number.rb 2>&1`