From 3035ec512dcf7542d2d3f8b9a4132f496c25a0fa Mon Sep 17 00:00:00 2001 From: Will Jordan Date: Tue, 30 Jun 2020 20:11:43 -0700 Subject: [PATCH] Reset Async queue on fork --- lib/concurrent-ruby/concurrent/async.rb | 9 +++++++++ spec/concurrent/async_spec.rb | 13 +++++++++++++ 2 files changed, 22 insertions(+) diff --git a/lib/concurrent-ruby/concurrent/async.rb b/lib/concurrent-ruby/concurrent/async.rb index 8fd4886cd..0e99b638d 100644 --- a/lib/concurrent-ruby/concurrent/async.rb +++ b/lib/concurrent-ruby/concurrent/async.rb @@ -309,6 +309,7 @@ def initialize(delegate) @delegate = delegate @queue = [] @executor = Concurrent.global_io_executor + @ruby_pid = $$ end # Delegates method calls to the wrapped object. @@ -326,6 +327,7 @@ def method_missing(method, *args, &block) ivar = Concurrent::IVar.new synchronize do + reset_if_forked @queue.push [ivar, method, args, block] @executor.post { perform } if @queue.length == 1 end @@ -361,6 +363,13 @@ def perform end end end + + def reset_if_forked + if $$ != @ruby_pid + @queue.clear + @ruby_pid = $$ + end + end end private_constant :AsyncDelegator diff --git a/spec/concurrent/async_spec.rb b/spec/concurrent/async_spec.rb index 18ea0dc87..3cd22d9b9 100644 --- a/spec/concurrent/async_spec.rb +++ b/spec/concurrent/async_spec.rb @@ -296,5 +296,18 @@ def gather(seconds, first, *rest) expect(object.bucket).to eq [:a, :b, :c, :d] end end + + context 'fork safety' do + it 'does not hang when forked' do + skip "Platform does not support fork" unless Process.respond_to?(:fork) + object = Class.new { + include Concurrent::Async + def foo; end + }.new + object.async.foo + _, status = Process.waitpid2(fork {object.await.foo}) + expect(status.exitstatus).to eq 0 + end + end end end