diff --git a/lib/concurrent-ruby/concurrent/async.rb b/lib/concurrent-ruby/concurrent/async.rb index 8fd4886cd..0e99b638d 100644 --- a/lib/concurrent-ruby/concurrent/async.rb +++ b/lib/concurrent-ruby/concurrent/async.rb @@ -309,6 +309,7 @@ def initialize(delegate) @delegate = delegate @queue = [] @executor = Concurrent.global_io_executor + @ruby_pid = $$ end # Delegates method calls to the wrapped object. @@ -326,6 +327,7 @@ def method_missing(method, *args, &block) ivar = Concurrent::IVar.new synchronize do + reset_if_forked @queue.push [ivar, method, args, block] @executor.post { perform } if @queue.length == 1 end @@ -361,6 +363,13 @@ def perform end end end + + def reset_if_forked + if $$ != @ruby_pid + @queue.clear + @ruby_pid = $$ + end + end end private_constant :AsyncDelegator diff --git a/spec/concurrent/async_spec.rb b/spec/concurrent/async_spec.rb index 18ea0dc87..3cd22d9b9 100644 --- a/spec/concurrent/async_spec.rb +++ b/spec/concurrent/async_spec.rb @@ -296,5 +296,18 @@ def gather(seconds, first, *rest) expect(object.bucket).to eq [:a, :b, :c, :d] end end + + context 'fork safety' do + it 'does not hang when forked' do + skip "Platform does not support fork" unless Process.respond_to?(:fork) + object = Class.new { + include Concurrent::Async + def foo; end + }.new + object.async.foo + _, status = Process.waitpid2(fork {object.await.foo}) + expect(status.exitstatus).to eq 0 + end + end end end