Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Set thread safety #911

Merged
merged 3 commits into from Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 14 additions & 6 deletions lib/concurrent-ruby/concurrent/set.rb
Expand Up @@ -19,20 +19,27 @@ module Concurrent
#
# @see http://ruby-doc.org/stdlib-2.4.0/libdoc/set/rdoc/Set.html Ruby standard library `Set`


# @!macro internal_implementation_note
SetImplementation = case
when Concurrent.on_cruby?
# Because MRI never runs code in parallel, the existing
# non-thread-safe structures should usually work fine.
::Set
# The CRuby implementation of Set is written in Ruby itself and is
# not thread safe for certain methods.
require 'monitor'
require 'concurrent/thread_safe/util/data_structures'

class CRubySet < ::Set
end

ThreadSafe::Util.make_synchronized_on_cruby CRubySet
CRubySet

when Concurrent.on_jruby?
require 'jruby/synchronized'

class JRubySet < ::Set
include JRuby::Synchronized
end

JRubySet

when Concurrent.on_rbx?
Expand All @@ -41,7 +48,8 @@ class JRubySet < ::Set

class RbxSet < ::Set
end
ThreadSafe::Util.make_synchronized_on_rbx Concurrent::RbxSet

ThreadSafe::Util.make_synchronized_on_rbx RbxSet
RbxSet

when Concurrent.on_truffleruby?
Expand All @@ -50,7 +58,7 @@ class RbxSet < ::Set
class TruffleRubySet < ::Set
end

ThreadSafe::Util.make_synchronized_on_truffleruby Concurrent::TruffleRubySet
ThreadSafe::Util.make_synchronized_on_truffleruby TruffleRubySet
TruffleRubySet

else
Expand Down
Expand Up @@ -12,12 +12,37 @@ def self.synchronized(object, &block)
module Concurrent
module ThreadSafe
module Util
def self.make_synchronized_on_cruby(klass)
klass.class_eval do
def initialize(*args, &block)
@_monitor = Monitor.new
super
end

def initialize_copy(other)
# make sure a copy is not sharing a monitor with the original object!
@_monitor = Monitor.new
super
end
end

klass.superclass.instance_methods(false).each do |method|
klass.class_eval <<-RUBY, __FILE__, __LINE__ + 1
def #{method}(*args)
monitor = @_monitor
monitor or raise("BUG: Internal monitor was not properly initialized. Please report this to the concurrent-ruby developers.")
monitor.synchronize { super }
end
RUBY
end
end

def self.make_synchronized_on_rbx(klass)
klass.class_eval do
private

def _mon_initialize
@_monitor = Monitor.new unless @_monitor # avoid double initialisation
@_monitor ||= Monitor.new # avoid double initialisation
end

def self.new(*args)
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent-ruby/concurrent/version.rb
@@ -1,3 +1,3 @@
module Concurrent
VERSION = '1.1.8'
VERSION = '1.1.8'
end
59 changes: 58 additions & 1 deletion spec/concurrent/set_spec.rb
Expand Up @@ -42,7 +42,7 @@ module Concurrent
end

context 'concurrency' do
it do
it '#add and #delete' do
(1..Concurrent::ThreadSafe::Test::THREADS).map do |i|
in_thread do
1000.times do
Expand All @@ -55,6 +55,63 @@ module Concurrent
end.map(&:join)
expect(set).to be_empty
end

it 'force context switch' do
barrier = Concurrent::CyclicBarrier.new(2)

# methods like include? or delete? are implemented for CRuby in Ruby itself
# @see https://github.com/ruby/ruby/blob/master/lib/set.rb
set.clear

# add a single element
set.add(1)

# This thread should start and `Set#reject!` in CRuby should cache a value of `0` for size
thread_reject = in_thread do
# we expect this to return nil since nothing should have changed.
expect(set.reject! do |v|
barrier.wait
v == 1 # only delete the 1 value
end).to eq set
end

thread_add = in_thread do
barrier.wait
expect(set.add?(1)).to eq set
end

join_with [thread_reject, thread_add]
end

it '#each' do
threads = []
("a".."z").inject(set, &:<<) # setup a non-empty set

threads << in_thread do
2000.times do
size = nil
set.each do |member|
if size.nil?
size = set.length
else
expect(set.length).to eq(size)
end
end
end
end

threads += (1..19).map do |i|
in_thread do
v = i * 1000
10.times do
200.times { |j| set << (v+j) }
200.times { |j| set.delete(v+j) }
end
end
end

threads.map(&:join)
end
end
end
end