From dd2d8cf6fc5c85c904d2892de4d89eed311b995a Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 4 Jun 2021 19:38:53 +0200 Subject: [PATCH 1/3] Fix Set thread safety Co-authored-by: Mike Dalessio Co-authored-by: Farid Zakaria --- lib/concurrent-ruby/concurrent/set.rb | 20 +++++-- .../thread_safe/util/data_structures.rb | 25 ++++++++ spec/concurrent/set_spec.rb | 59 ++++++++++++++++++- 3 files changed, 97 insertions(+), 7 deletions(-) diff --git a/lib/concurrent-ruby/concurrent/set.rb b/lib/concurrent-ruby/concurrent/set.rb index 602d49408..3bf0c895c 100644 --- a/lib/concurrent-ruby/concurrent/set.rb +++ b/lib/concurrent-ruby/concurrent/set.rb @@ -19,13 +19,19 @@ module Concurrent # # @see http://ruby-doc.org/stdlib-2.4.0/libdoc/set/rdoc/Set.html Ruby standard library `Set` - # @!macro internal_implementation_note SetImplementation = case when Concurrent.on_cruby? - # Because MRI never runs code in parallel, the existing - # non-thread-safe structures should usually work fine. - ::Set + # The CRuby implementation of Set is written in Ruby itself and is + # not thread safe for certain methods. + require 'monitor' + require 'concurrent/thread_safe/util/data_structures' + + class CRubySet < ::Set + end + + ThreadSafe::Util.make_synchronized_on_cruby CRubySet + CRubySet when Concurrent.on_jruby? require 'jruby/synchronized' @@ -33,6 +39,7 @@ module Concurrent class JRubySet < ::Set include JRuby::Synchronized end + JRubySet when Concurrent.on_rbx? @@ -41,7 +48,8 @@ class JRubySet < ::Set class RbxSet < ::Set end - ThreadSafe::Util.make_synchronized_on_rbx Concurrent::RbxSet + + ThreadSafe::Util.make_synchronized_on_rbx RbxSet RbxSet when Concurrent.on_truffleruby? @@ -50,7 +58,7 @@ class RbxSet < ::Set class TruffleRubySet < ::Set end - ThreadSafe::Util.make_synchronized_on_truffleruby Concurrent::TruffleRubySet + ThreadSafe::Util.make_synchronized_on_truffleruby TruffleRubySet TruffleRubySet else diff --git a/lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb b/lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb index ff1e8ed97..de39abaab 100644 --- a/lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb +++ b/lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb @@ -12,6 +12,31 @@ def self.synchronized(object, &block) module Concurrent module ThreadSafe module Util + def self.make_synchronized_on_cruby(klass) + klass.class_eval do + def initialize(*args, &block) + @_monitor = Monitor.new + super + end + + def initialize_copy(other) + # make sure a copy is not sharing a monitor with the original object! + @_monitor = Monitor.new + super + end + end + + klass.superclass.instance_methods(false).each do |method| + klass.class_eval <<-RUBY, __FILE__, __LINE__ + 1 + def #{method}(*args) + monitor = @_monitor + monitor or raise("BUG: Internal monitor was not properly initialized. Please report this to the concurrent-ruby developers.") + monitor.synchronize { super } + end + RUBY + end + end + def self.make_synchronized_on_rbx(klass) klass.class_eval do private diff --git a/spec/concurrent/set_spec.rb b/spec/concurrent/set_spec.rb index 7c69b4bec..558801864 100644 --- a/spec/concurrent/set_spec.rb +++ b/spec/concurrent/set_spec.rb @@ -42,7 +42,7 @@ module Concurrent end context 'concurrency' do - it do + it '#add and #delete' do (1..Concurrent::ThreadSafe::Test::THREADS).map do |i| in_thread do 1000.times do @@ -55,6 +55,63 @@ module Concurrent end.map(&:join) expect(set).to be_empty end + + it 'force context switch' do + barrier = Concurrent::CyclicBarrier.new(2) + + # methods like include? or delete? are implemented for CRuby in Ruby itself + # @see https://github.com/ruby/ruby/blob/master/lib/set.rb + set.clear + + # add a single element + set.add(1) + + # This thread should start and `Set#reject!` in CRuby should cache a value of `0` for size + thread_reject = in_thread do + # we expect this to return nil since nothing should have changed. + expect(set.reject! do |v| + barrier.wait + v == 1 # only delete the 1 value + end).to eq set + end + + thread_add = in_thread do + barrier.wait + expect(set.add?(1)).to eq set + end + + join_with [thread_reject, thread_add] + end + + it '#each' do + threads = [] + ("a".."z").inject(set, &:<<) # setup a non-empty set + + threads << in_thread do + 2000.times do + size = nil + set.each do |member| + if size.nil? + size = set.length + else + expect(set.length).to eq(size) + end + end + end + end + + threads += (1..19).map do |i| + in_thread do + v = i * 1000 + 10.times do + 200.times { |j| set << (v+j) } + 200.times { |j| set.delete(v+j) } + end + end + end + + threads.map(&:join) + end end end end From 8fa5b953d05d9bdbbaff43c08abe05f4cfefd1e8 Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 4 Jun 2021 19:39:18 +0200 Subject: [PATCH 2/3] fix formatting --- lib/concurrent-ruby/concurrent/version.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/concurrent-ruby/concurrent/version.rb b/lib/concurrent-ruby/concurrent/version.rb index 8d9dde235..af9e5a77f 100644 --- a/lib/concurrent-ruby/concurrent/version.rb +++ b/lib/concurrent-ruby/concurrent/version.rb @@ -1,3 +1,3 @@ module Concurrent - VERSION = '1.1.8' + VERSION = '1.1.8' end From 7e1dc828807685a3d13fc5008770166626f7d58f Mon Sep 17 00:00:00 2001 From: Petr Chalupa Date: Fri, 4 Jun 2021 19:40:10 +0200 Subject: [PATCH 3/3] Avoid warning about uninitialized ivar Co-authored-by: Farid Zakaria --- .../concurrent/thread_safe/util/data_structures.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb b/lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb index de39abaab..24d039b2f 100644 --- a/lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb +++ b/lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb @@ -42,7 +42,7 @@ def self.make_synchronized_on_rbx(klass) private def _mon_initialize - @_monitor = Monitor.new unless @_monitor # avoid double initialisation + @_monitor ||= Monitor.new # avoid double initialisation end def self.new(*args)