Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add :finish_in_order option #339

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ They are called on the main process and protected with a mutex.
Parallel.map(1..100, finish: -> (item, i, result) { ... do something ... }) { sleep 1 }
```

Set `finish_in_order: true` to ensure the `:finish` hooks get called in the order of the items.

```Ruby
Parallel.map(1..9, finish: -> (item, i, result) { puts "#{item} ok" }, finish_in_order: true) { sleep rand }
```

_NOTE: If all you are trying to do is get the index, it is much more performant to use `each_with_index` instead._

### Worker number
Expand Down
24 changes: 24 additions & 0 deletions lib/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,33 @@ def with_instrumentation(item, index, options)

def instrument_finish(item, index, result, options)
return unless on_finish = options[:finish]
return instrument_finish_in_order(item, index, result, options) if options[:finish_in_order]
options[:mutex].synchronize { on_finish.call(item, index, result) }
end

def instrument_finish_in_order(item, index, result, options)
options[:mutex].synchronize do
options[:finish_items] ||= []
options[:finish_items_done] ||= []
options[:finish_index] ||= 0
Comment on lines +650 to +652
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these be local variables ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't use local variables, as they need to maintain the state between invocations.
As they are being called via a class method, it can't use instance variables either.
The options are dup'ed so works even when invoked multiple times with the same options

if index == options[:finish_index]
# call finish for current item and any ready items
options[:finish].call(item, index, result)
options[:finish_index] += 1
Comment on lines +655 to +656
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can do this in the loop to not have the code twice ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do it in the loop, you would have to store the item first, e.g. something like this:
This is a bit more readable, but uses a tiny bit more memory.

    def instrument_finish_in_order(item, index, result, options)
      options[:mutex].synchronize do
        options[:finish_items] ||= []
        options[:finish_items_done] ||= []
        options[:finish_index] ||= 0
        options[:finish_items][index] = item
        options[:finish_items_done][index] = true
        break unless index == options[:finish_index]
        index.upto(options[:finish_items].size).each do |old_index|
          break unless options[:finish_items_done][old_index]
          old_item = options[:finish_items][old_index]
          options[:finish].call(old_item, old_index, result)
          options[:finish_index] += 1
        end
      end
    end

Let me know if you prefer this version and I'll update the PR

(index + 1).upto(options[:finish_items].size).each do |old_index|
break unless options[:finish_items_done][old_index]
old_item = options[:finish_items][old_index]
options[:finish].call(old_item, old_index, result)
options[:finish_index] += 1
end
else
# store for later
options[:finish_items][index] = item
options[:finish_items_done][index] = true
end
end
end

def instrument_start(item, index, options)
return unless on_start = options[:start]
options[:mutex].synchronize { on_start.call(item, index) }
Expand Down
22 changes: 22 additions & 0 deletions spec/cases/finish_in_order.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

require './spec/cases/helper'

class Callback
def self.call(_item)
sleep rand * 0.01
end
end

method = ENV.fetch('METHOD')
in_worker_type = "in_#{ENV.fetch('WORKER_TYPE')}".to_sym
$stdout.sync = true

items = 1..9
finish = ->(item, _index, _result) { puts "finish #{item}" }
options = { in_worker_type => 4, finish: finish, finish_in_order: true }
if in_worker_type == :in_ractors
Parallel.public_send(method, items, options.merge(ractor: [Callback, :call]))
else
Parallel.public_send(method, items, options) { |item| Callback.call(item) }
end
5 changes: 5 additions & 0 deletions spec/parallel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,11 @@ def cpus
end
end

it "calls finish hook with finish_in_order: true" do
out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/finish_in_order.rb 2>&1`
without_ractor_warning(out).should == (1..9).map { |item| "finish #{item}\n" }.join
end

it "sets Parallel.worker_number with #{type}" do
skip "unsupported" if type == "ractors"
out = `METHOD=each WORKER_TYPE=#{type} ruby spec/cases/with_worker_number.rb 2>&1`
Expand Down