Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Concurrent::Set thread-safe on CRuby #906

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 7 additions & 3 deletions lib/concurrent-ruby/concurrent/set.rb
Expand Up @@ -23,9 +23,13 @@ module Concurrent
# @!macro internal_implementation_note
SetImplementation = case
when Concurrent.on_cruby?
# Because MRI never runs code in parallel, the existing
# non-thread-safe structures should usually work fine.
::Set
require 'monitor'
require 'concurrent/thread_safe/util/data_structures'

class CRubySet < ::Set
end
ThreadSafe::Util.make_synchronized_on_cruby Concurrent::CRubySet
CRubySet

when Concurrent.on_jruby?
require 'jruby/synchronized'
Expand Down
19 changes: 19 additions & 0 deletions lib/concurrent-ruby/concurrent/thread_safe/util/data_structures.rb
Expand Up @@ -12,6 +12,25 @@ def self.synchronized(object, &block)
module Concurrent
module ThreadSafe
module Util
def self.make_synchronized_on_cruby(klass)
klass.class_eval do
def initialize(*args, &block)
@_monitor = Monitor.new
super
end
end

klass.superclass.instance_methods(false).each do |method|
klass.class_eval <<-RUBY, __FILE__, __LINE__ + 1
def #{method}(*args)
monitor = @_monitor
monitor or raise("BUG: Internal monitor was not properly initialized. Please report this to the concurrent-ruby developers.")
monitor.synchronize { super }
end
RUBY
end
end

def self.make_synchronized_on_rbx(klass)
klass.class_eval do
private
Expand Down
32 changes: 31 additions & 1 deletion spec/concurrent/set_spec.rb
Expand Up @@ -42,7 +42,7 @@ module Concurrent
end

context 'concurrency' do
it do
it '#add and #delete' do
(1..Concurrent::ThreadSafe::Test::THREADS).map do |i|
in_thread do
1000.times do
Expand All @@ -55,6 +55,36 @@ module Concurrent
end.map(&:join)
expect(set).to be_empty
end

it '#each' do
threads = []
("a".."z").inject(set, &:<<) # setup a non-empty set

threads << in_thread do
2000.times do
size = nil
set.each do |member|
if size.nil?
size = set.length
else
expect(set.length).to eq(size)
end
end
end
end

threads += (1..19).map do |i|
in_thread do
v = i * 1000
10.times do
200.times { |j| set << (v+j) }
200.times { |j| set.delete(v+j) }
end
end
end

threads.map(&:join)
end
end
end
end