diff --git a/lib/concurrent-ruby/concurrent/set.rb b/lib/concurrent-ruby/concurrent/set.rb index 602d49408..3bf0c895c 100644 --- a/lib/concurrent-ruby/concurrent/set.rb +++ b/lib/concurrent-ruby/concurrent/set.rb @@ -19,13 +19,19 @@ module Concurrent # # @see http://ruby-doc.org/stdlib-2.4.0/libdoc/set/rdoc/Set.html Ruby standard library `Set` - # @!macro internal_implementation_note SetImplementation = case when Concurrent.on_cruby? - # Because MRI never runs code in parallel, the existing - # non-thread-safe structures should usually work fine. - ::Set + # The CRuby implementation of Set is written in Ruby itself and is + # not thread safe for certain methods. + require 'monitor' + require 'concurrent/thread_safe/util/data_structures' + + class CRubySet < ::Set + end + + ThreadSafe::Util.make_synchronized_on_cruby CRubySet + CRubySet when Concurrent.on_jruby? require 'jruby/synchronized' @@ -33,6 +39,7 @@ module Concurrent class JRubySet < ::Set include JRuby::Synchronized end + JRubySet when Concurrent.on_rbx? @@ -41,7 +48,8 @@ class JRubySet < ::Set class RbxSet < ::Set end - ThreadSafe::Util.make_synchronized_on_rbx Concurrent::RbxSet + + ThreadSafe::Util.make_synchronized_on_rbx RbxSet RbxSet when Concurrent.on_truffleruby? @@ -50,7 +58,7 @@ class RbxSet < ::Set class TruffleRubySet < ::Set end - ThreadSafe::Util.make_synchronized_on_truffleruby Concurrent::TruffleRubySet + ThreadSafe::Util.make_synchronized_on_truffleruby TruffleRubySet TruffleRubySet else diff --git a/lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb b/lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb index ff1e8ed97..24d039b2f 100644 --- a/lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb +++ b/lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb @@ -12,12 +12,37 @@ def self.synchronized(object, &block) module Concurrent module ThreadSafe module Util + def self.make_synchronized_on_cruby(klass) + klass.class_eval do + def initialize(*args, &block) + @_monitor = Monitor.new + super + end + + def initialize_copy(other) + # make sure a copy is not sharing a monitor with the original object! + @_monitor = Monitor.new + super + end + end + + klass.superclass.instance_methods(false).each do |method| + klass.class_eval <<-RUBY, __FILE__, __LINE__ + 1 + def #{method}(*args) + monitor = @_monitor + monitor or raise("BUG: Internal monitor was not properly initialized. Please report this to the concurrent-ruby developers.") + monitor.synchronize { super } + end + RUBY + end + end + def self.make_synchronized_on_rbx(klass) klass.class_eval do private def _mon_initialize - @_monitor = Monitor.new unless @_monitor # avoid double initialisation + @_monitor ||= Monitor.new # avoid double initialisation end def self.new(*args) diff --git a/lib/concurrent-ruby/concurrent/version.rb b/lib/concurrent-ruby/concurrent/version.rb index 8d9dde235..af9e5a77f 100644 --- a/lib/concurrent-ruby/concurrent/version.rb +++ b/lib/concurrent-ruby/concurrent/version.rb @@ -1,3 +1,3 @@ module Concurrent - VERSION = '1.1.8' + VERSION = '1.1.8' end diff --git a/spec/concurrent/set_spec.rb b/spec/concurrent/set_spec.rb index 7c69b4bec..558801864 100644 --- a/spec/concurrent/set_spec.rb +++ b/spec/concurrent/set_spec.rb @@ -42,7 +42,7 @@ module Concurrent end context 'concurrency' do - it do + it '#add and #delete' do (1..Concurrent::ThreadSafe::Test::THREADS).map do |i| in_thread do 1000.times do @@ -55,6 +55,63 @@ module Concurrent end.map(&:join) expect(set).to be_empty end + + it 'force context switch' do + barrier = Concurrent::CyclicBarrier.new(2) + + # methods like include? or delete? are implemented for CRuby in Ruby itself + # @see https://github.com/ruby/ruby/blob/master/lib/set.rb + set.clear + + # add a single element + set.add(1) + + # This thread should start and `Set#reject!` in CRuby should cache a value of `0` for size + thread_reject = in_thread do + # we expect this to return nil since nothing should have changed. + expect(set.reject! do |v| + barrier.wait + v == 1 # only delete the 1 value + end).to eq set + end + + thread_add = in_thread do + barrier.wait + expect(set.add?(1)).to eq set + end + + join_with [thread_reject, thread_add] + end + + it '#each' do + threads = [] + ("a".."z").inject(set, &:<<) # setup a non-empty set + + threads << in_thread do + 2000.times do + size = nil + set.each do |member| + if size.nil? + size = set.length + else + expect(set.length).to eq(size) + end + end + end + end + + threads += (1..19).map do |i| + in_thread do + v = i * 1000 + 10.times do + 200.times { |j| set << (v+j) } + 200.times { |j| set.delete(v+j) } + end + end + end + + threads.map(&:join) + end end end end