Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RubyThreadLocalVar: rely on GIL on MRI to avoid problems with thread/mutex/queue in finalizers #856

Merged
merged 1 commit into from Jul 20, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
91 changes: 49 additions & 42 deletions lib/concurrent-ruby/concurrent/atomic/ruby_thread_local_var.rb
Expand Up @@ -28,38 +28,27 @@ class RubyThreadLocalVar < AbstractThreadLocalVar
# But when a Thread is GC'd, we need to drop the reference to its thread-local
# array, so we don't leak memory

# @!visibility private
FREE = []
LOCK = Mutex.new
ARRAYS = {} # used as a hash set
# noinspection RubyClassVariableUsageInspection
@@next = 0
QUEUE = Queue.new
THREAD = Thread.new do
while true
method, i = QUEUE.pop
case method
when :thread_local_finalizer
LOCK.synchronize do
FREE.push(i)
# The cost of GC'ing a TLV is linear in the number of threads using TLVs
# But that is natural! More threads means more storage is used per TLV
# So naturally more CPU time is required to free more storage
ARRAYS.each_value do |array|
array[i] = nil
end
end
when :thread_finalizer
LOCK.synchronize do
# The thread which used this thread-local array is now gone
# So don't hold onto a reference to the array (thus blocking GC)
ARRAYS.delete(i)
end
end
FREE = []
LOCK = Mutex.new
THREAD_LOCAL_ARRAYS = {} # used as a hash set

# synchronize when not on MRI
# on MRI using lock in finalizer leads to "can't be called from trap context" error
# so the code is carefully written to be tread-safe on MRI relying on GIL

if Concurrent.on_cruby?
# @!visibility private
def self.semi_sync(&block)
block.call
end
else
# @!visibility private
def self.semi_sync(&block)
LOCK.synchronize(&block)
end
end

private_constant :FREE, :LOCK, :ARRAYS, :QUEUE, :THREAD
private_constant :FREE, :LOCK, :THREAD_LOCAL_ARRAYS

# @!macro thread_local_var_method_get
def value
Expand All @@ -85,7 +74,7 @@ def value=(value)
# Using Ruby's built-in thread-local storage is faster
unless (array = get_threadlocal_array(me))
array = set_threadlocal_array([], me)
LOCK.synchronize { ARRAYS[array.object_id] = array }
self.class.semi_sync { THREAD_LOCAL_ARRAYS[array.object_id] = array }
ObjectSpace.define_finalizer(me, self.class.thread_finalizer(array.object_id))
end
array[@index] = (value.nil? ? NULL : value)
Expand All @@ -95,32 +84,50 @@ def value=(value)
protected

# @!visibility private
# noinspection RubyClassVariableUsageInspection
def allocate_storage
@index = LOCK.synchronize do
FREE.pop || begin
result = @@next
@@next += 1
result
end
end
@index = FREE.pop || next_index

ObjectSpace.define_finalizer(self, self.class.thread_local_finalizer(@index))
end

# @!visibility private
def self.thread_local_finalizer(index)
# avoid error: can't be called from trap context
proc { QUEUE.push [:thread_local_finalizer, index] }
proc do
semi_sync do
# The cost of GC'ing a TLV is linear in the number of threads using TLVs
# But that is natural! More threads means more storage is used per TLV
# So naturally more CPU time is required to free more storage
THREAD_LOCAL_ARRAYS.each_value { |array| array[index] = nil }
# free index has to be published after the arrays are cleared
FREE.push(index)
end
end
end

# @!visibility private
def self.thread_finalizer(id)
# avoid error: can't be called from trap context
proc { QUEUE.push [:thread_finalizer, id] }
proc do
semi_sync do
# The thread which used this thread-local array is now gone
# So don't hold onto a reference to the array (thus blocking GC)
THREAD_LOCAL_ARRAYS.delete(id)
end
end
end

private

# noinspection RubyClassVariableUsageInspection
@@next = 0
# noinspection RubyClassVariableUsageInspection
def next_index
LOCK.synchronize do
result = @@next
@@next += 1
result
end
end

if Thread.instance_methods.include?(:thread_variable_get)

def get_threadlocal_array(thread = Thread.current)
Expand Down