Skip to content

Commit

Permalink
Merge pull request #340 from grosser/grosser/order
Browse files Browse the repository at this point in the history
polish finish_in_order
  • Loading branch information
grosser committed Dec 16, 2023
2 parents 6cd55df + e220eb9 commit 792a1fe
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 31 deletions.
7 changes: 3 additions & 4 deletions Readme.md
Expand Up @@ -150,22 +150,21 @@ Parallel.map(1..50, progress: "Doing stuff") { sleep 1 }

Use `:finish` or `:start` hook to get progress information.
- `:start` has item and index
- `:finish` has item, index, result
- `:finish` has item, index, and result

They are called on the main process and protected with a mutex.
(To just get the index, use the more performant `Parallel.each_with_index`)

```Ruby
Parallel.map(1..100, finish: -> (item, i, result) { ... do something ... }) { sleep 1 }
```

Set `finish_in_order: true` to ensure the `:finish` hooks get called in the order of the items.
Set `finish_in_order: true` to call the `:finish` hook in the order of the input (will take longer to see initial output).

```Ruby
Parallel.map(1..9, finish: -> (item, i, result) { puts "#{item} ok" }, finish_in_order: true) { sleep rand }
```

_NOTE: If all you are trying to do is get the index, it is much more performant to use `each_with_index` instead._

### Worker number

Use `Parallel.worker_number` to determine the worker slot in which your
Expand Down
36 changes: 18 additions & 18 deletions lib/parallel.rb
Expand Up @@ -640,30 +640,30 @@ def with_instrumentation(item, index, options)
end

def instrument_finish(item, index, result, options)
return unless on_finish = options[:finish]
return unless (on_finish = options[:finish])
return instrument_finish_in_order(item, index, result, options) if options[:finish_in_order]
options[:mutex].synchronize { on_finish.call(item, index, result) }
end

# yield results in the order of the input items
# needs to use `options` to store state between executions
# needs to use `done` index since a nil result would also be valid
def instrument_finish_in_order(item, index, result, options)
options[:mutex].synchronize do
options[:finish_items] ||= []
options[:finish_items_done] ||= []
options[:finish_index] ||= 0
if index == options[:finish_index]
# call finish for current item and any ready items
options[:finish].call(item, index, result)
options[:finish_index] += 1
(index + 1).upto(options[:finish_items].size).each do |old_index|
break unless options[:finish_items_done][old_index]
old_item = options[:finish_items][old_index]
options[:finish].call(old_item, old_index, result)
options[:finish_index] += 1
end
else
# store for later
options[:finish_items][index] = item
options[:finish_items_done][index] = true
# initialize our state
options[:finish_done] ||= []
options[:finish_expecting] ||= 0 # we wait for item at index 0

# store current result
options[:finish_done][index] = [item, result]

# yield all results that are now in order
break unless index == options[:finish_expecting]
index.upto(options[:finish_done].size).each do |i|
break unless (done = options[:finish_done][i])
options[:finish_done][i] = nil # allow GC to free this item and result
options[:finish].call(done[0], i, done[1])
options[:finish_expecting] += 1
end
end
end
Expand Down
7 changes: 4 additions & 3 deletions spec/cases/finish_in_order.rb
Expand Up @@ -3,17 +3,18 @@
require './spec/cases/helper'

class Callback
def self.call(_item)
def self.call(item)
sleep rand * 0.01
item.is_a?(Numeric) ? "F#{item}" : item
end
end

method = ENV.fetch('METHOD')
in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym
$stdout.sync = true

items = 1..9
finish = ->(item, _index, _result) { puts "finish #{item}" }
items = [nil, false, 2, 3, 4]
finish = ->(item, index, result) { puts "finish #{item.inspect} #{index} #{result.inspect}" }
options = { in_worker_type => 4, finish: finish, finish_in_order: true }
if in_worker_type == :in_ractors
Parallel.public_send(method, items, options.merge(ractor: [Callback, :call]))
Expand Down
18 changes: 12 additions & 6 deletions spec/parallel_spec.rb
Expand Up @@ -341,12 +341,23 @@ def cpus
Parallel.worker_number.should be_nil
end

it "can run with 0" do
it "can run with 0 by not using #{type}" do
Thread.should_not_receive(:exclusive)
Process.should_not_receive(:fork)
result = Parallel.map([1, 2, 3, 4, 5, 6, 7, 8, 9], "in_#{type}".to_sym => 0) { |x| x + 2 }
result.should == [3, 4, 5, 6, 7, 8, 9, 10, 11]
end

it "can call finish hook in order #{type}" do
out = `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/finish_in_order.rb 2>&1`
without_ractor_warning(out).should == <<~OUT
finish nil 0 nil
finish false 1 false
finish 2 2 "F2"
finish 3 3 "F3"
finish 4 4 "F4"
OUT
end
end

it "notifies when an item of work is dispatched to a worker process" do
Expand Down Expand Up @@ -653,11 +664,6 @@ def cpus
end
end

it "calls finish hook with finish_in_order: true" do
out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/finish_in_order.rb 2>&1`
without_ractor_warning(out).should == (1..9).map { |item| "finish #{item}\n" }.join
end

it "sets Parallel.worker_number with #{type}" do
skip "unsupported" if type == "ractors"
out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_worker_number.rb 2>&1`
Expand Down

0 comments on commit 792a1fe

Please sign in to comment.