Skip to content

Commit

Permalink
Merge pull request #285 from grosser/grosser/break
Browse files Browse the repository at this point in the history
allow breaking with value
  • Loading branch information
grosser committed Nov 7, 2020
2 parents 6d2a85f + 4c16b35 commit ea04bff
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 32 deletions.
9 changes: 6 additions & 3 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ items = [1,2,3]
Parallel.each( -> { items.pop || Parallel::Stop }) { |number| ... }
```

You can also call `any?` or `all?`, which work the same way as `Array#any?` and `Array#all?`.
Also supports `any?` or `all?`

```Ruby
Parallel.any?([1,2,3,4,5,6,7]) { |number| number == 4 }
Expand All @@ -53,7 +53,6 @@ Parallel.all?([1,2,nil,4,5]) { |number| number != nil }
# => false
```


Processes/Threads are workers, they grab the next piece of work when they finish.

### Processes
Expand Down Expand Up @@ -106,11 +105,15 @@ To fix, autoloaded classes before the parallel block with either `require '<mode
### Break

```Ruby
Parallel.map(User.all) do |user|
Parallel.map([1, 2, 3]) do |i|
raise Parallel::Break # -> stops after all current items are finished
end
```

```Ruby
Parallel.map([1, 2, 3]) { |i| raise Parallel::Break, i if i == 2 } == 2
```

### Kill

Only use if whatever is executing in the sub-command is safe to kill at any point
Expand Down
55 changes: 27 additions & 28 deletions lib/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
require 'parallel/processor_count'

module Parallel
extend Parallel::ProcessorCount
extend ProcessorCount

Stop = Object.new.freeze

class DeadWorker < StandardError
end

class Break < StandardError
attr_reader :value
def initialize(value = nil)
@value = value
end
end

class Kill < StandardError
class Kill < Break
end

class UndumpableException < StandardError
Expand All @@ -22,8 +28,6 @@ def initialize(original)
end
end

Stop = Object.new.freeze

class ExceptionWrapper
attr_reader :exception
def initialize(exception)
Expand Down Expand Up @@ -102,7 +106,7 @@ def next
item, index = @mutex.synchronize do
return if @stopped
item = @lambda.call
@stopped = (item == Parallel::Stop)
@stopped = (item == Stop)
return if @stopped
[item, @index += 1]
end
Expand Down Expand Up @@ -230,12 +234,12 @@ def each(array, options={}, &block)

def any?(*args, &block)
raise "You must provide a block when calling #any?" if block.nil?
!each(*args) { |*a| raise Parallel::Kill if block.call(*a) }
!each(*args) { |*a| raise Kill if block.call(*a) }
end

def all?(*args, &block)
raise "You must provide a block when calling #all?" if block.nil?
!!each(*args) { |*a| raise Parallel::Kill unless block.call(*a) }
!!each(*args) { |*a| raise Kill unless block.call(*a) }
end

def each_with_index(array, options={}, &block)
Expand Down Expand Up @@ -270,16 +274,18 @@ def map(source, options = {}, &block)
options[:return_results] = (options[:preserve_results] != false || !!options[:finish])
add_progress_bar!(job_factory, options)

results = if size == 0
work_direct(job_factory, options, &block)
elsif method == :in_threads
work_in_threads(job_factory, options.merge(:count => size), &block)
else
work_in_processes(job_factory, options.merge(:count => size), &block)
end
if results
options[:return_results] ? results : source
end
result =
if size == 0
work_direct(job_factory, options, &block)
elsif method == :in_threads
work_in_threads(job_factory, options.merge(:count => size), &block)
else
work_in_processes(job_factory, options.merge(:count => size), &block)
end

return result.value if result.is_a?(Break)
raise result if result.is_a?(Exception)
options[:return_results] ? result : source
end

def map_with_index(array, options={}, &block)
Expand Down Expand Up @@ -340,7 +346,7 @@ def work_direct(job_factory, options, &block)
rescue
exception = $!
end
handle_exception(exception, results)
exception || results
ensure
self.worker_number = nil
end
Expand All @@ -367,7 +373,7 @@ def work_in_threads(job_factory, options, &block)
end
end

handle_exception(exception, results)
exception || results
end

def work_in_processes(job_factory, options, &blk)
Expand Down Expand Up @@ -401,7 +407,7 @@ def work_in_processes(job_factory, options, &blk)
results_mutex.synchronize { results[index] = result } # arrays are not threads safe on jRuby
rescue StandardError => e
exception = e
if Parallel::Kill === exception
if Kill === exception
(workers - [worker]).each do |w|
w.thread.kill if w.thread
UserInterruptHandler.kill(w.pid)
Expand All @@ -414,8 +420,7 @@ def work_in_processes(job_factory, options, &blk)
end
end
end

handle_exception(exception, results)
exception || results
end

def replace_worker(job_factory, workers, i, options, blk)
Expand Down Expand Up @@ -484,12 +489,6 @@ def process_incoming_jobs(read, write, job_factory, options, &block)
end
end

def handle_exception(exception, results)
return nil if [Parallel::Break, Parallel::Kill].include? exception.class
raise exception if exception
results
end

# options is either a Integer or a Hash with :count
def extract_count_from_options(options)
if options.is_a?(Hash)
Expand Down
2 changes: 1 addition & 1 deletion spec/cases/with_break.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
result = Parallel.public_send(method, 1..100, in_worker_type => worker_size) do |x|
sleep 0.1 # so all workers get started
print x
raise Parallel::Break if x == 1
raise Parallel::Break, *ARGV if x == 1
sleep 0.2 # so now no work gets queued before Parallel::Break is raised
x
end
Expand Down
4 changes: 4 additions & 0 deletions spec/parallel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ def cpus
`METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_exception_in_start_before_finish.rb 2>&1`.should == '3 called'
end

it "can return from break with #{type}" do
`METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_break.rb hi 2>&1`.should =~ /^\d{4} Parallel::Break raised - result "hi"$/
end

it "sets Parallel.worker_number with 4 #{type}" do
out = `METHOD=map WORKER_TYPE=#{type} ruby spec/cases/with_worker_number.rb 2>&1`
out.should =~ /\A[0123]+\z/
Expand Down

0 comments on commit ea04bff

Please sign in to comment.