Skip to content

Commit

Permalink
Merge pull request #727 from ruby-concurrency/pitr-ch/pool
Browse files Browse the repository at this point in the history
Fix #717: Reenable java implementations, fix global IO executor on JRuby, bunch of other bug fixes
  • Loading branch information
pitr-ch committed Jul 7, 2018
2 parents 3008823 + a026804 commit 6ceda0b
Show file tree
Hide file tree
Showing 45 changed files with 526 additions and 444 deletions.
3 changes: 2 additions & 1 deletion Gemfile
Expand Up @@ -10,8 +10,9 @@ group :development do
gem 'rake-compiler-dock', '~> 0.6.0'
gem 'gem-compiler', '~> 0.3.0'
gem 'benchmark-ips', '~> 2.7'
end

# documentation
group :documentation do
gem 'countloc', '~> 0.4.0', :platforms => :mri, :require => false
# TODO (pitr-ch 04-May-2018): update to remove: [DEPRECATION] `last_comment` is deprecated. Please use `last_description` instead.
gem 'yard', '~> 0.8.0', :require => false
Expand Down
7 changes: 2 additions & 5 deletions concurrent-ruby-edge.gemspec
@@ -1,8 +1,5 @@
$:.push File.join(File.dirname(__FILE__), 'lib')
$:.push File.join(File.dirname(__FILE__), 'support')

require 'concurrent/version'
require 'file_map'
require_relative 'lib/concurrent/version'
require_relative 'support/file_map'

Gem::Specification.new do |s|
git_files = `git ls-files`.split("\n")
Expand Down
4 changes: 1 addition & 3 deletions concurrent-ruby-ext.gemspec
@@ -1,6 +1,4 @@
$:.push File.join(File.dirname(__FILE__), 'lib')

require 'concurrent/version'
require_relative 'lib/concurrent/version'

Gem::Specification.new do |s|
s.name = 'concurrent-ruby-ext'
Expand Down
29 changes: 11 additions & 18 deletions concurrent-ruby.gemspec
@@ -1,33 +1,26 @@
$:.push File.join(File.dirname(__FILE__), 'lib')
$:.push File.join(File.dirname(__FILE__), 'support')

require 'concurrent/version'
require 'file_map'
require_relative 'lib/concurrent/version'
require_relative 'lib/concurrent/utility/engine'
require_relative 'support/file_map'

Gem::Specification.new do |s|
git_files = `git ls-files`.split("\n")

s.name = 'concurrent-ruby'
s.version = Concurrent::VERSION
s.platform = Gem::Platform::RUBY
s.platform = Concurrent.on_jruby? ? Gem::Platform::JAVA : Gem::Platform::RUBY
s.authors = ["Jerry D'Antonio", 'Petr Chalupa', 'The Ruby Concurrency Team']
s.email = 'concurrent-ruby@googlegroups.com'
s.homepage = 'http://www.concurrent-ruby.com'
s.summary = 'Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.'
s.license = 'MIT'
s.date = Time.now.strftime('%Y-%m-%d')
s.files = FileMap::MAP.fetch(:core)
s.files = [*FileMap::MAP.fetch(:core),
*FileMap::MAP.fetch(:spec),
*(Dir['lib/**/*.jar'] if Concurrent.on_jruby?)]
s.extra_rdoc_files = Dir['README*', 'LICENSE*', 'CHANGELOG*']
s.require_paths = ['lib']
s.description = <<-EOF
Modern concurrency tools including agents, futures, promises, thread pools, actors, supervisors, and more.
Inspired by Erlang, Clojure, Go, JavaScript, actors, and classic concurrency patterns.
EOF

if defined?(JRUBY_VERSION)
s.files += Dir['lib/**/*.jar']
s.platform = 'java'
end
s.description = <<-TXT.gsub(/^ +/, '')
Modern concurrency tools including agents, futures, promises, thread pools, actors, supervisors, and more.
Inspired by Erlang, Clojure, Go, JavaScript, actors, and classic concurrency patterns.
TXT

s.required_ruby_version = '>= 1.9.3'
end
34 changes: 34 additions & 0 deletions ext/com/concurrent_ruby/ext/SynchronizationLibrary.java
Expand Up @@ -5,6 +5,7 @@
import org.jruby.RubyClass;
import org.jruby.RubyModule;
import org.jruby.RubyObject;
import org.jruby.RubyThread;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
Expand Down Expand Up @@ -96,6 +97,14 @@ public void load(Ruby runtime, boolean wrap) throws IOException {

defineClass(runtime, synchronizationModule, "AbstractLockableObject", "JRubyLockableObject",
JRubyLockableObject.class, JRUBY_LOCKABLE_OBJECT_ALLOCATOR);

defineClass(runtime, synchronizationModule, "Object", "JRuby",
JRuby.class, new ObjectAllocator() {
@Override
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
return new JRuby(runtime, klazz);
}
});
}

private RubyClass defineClass(
Expand Down Expand Up @@ -267,4 +276,29 @@ public IRubyObject nsBroadcast(ThreadContext context) {
return this;
}
}

@JRubyClass(name = "JRuby")
public static class JRuby extends RubyObject {
public JRuby(Ruby runtime, RubyClass metaClass) {
super(runtime, metaClass);
}

@JRubyMethod(name = "sleep_interruptibly", visibility = Visibility.PUBLIC, module = true)
public static IRubyObject sleepInterruptibly(ThreadContext context, IRubyObject receiver, Block block) {
try {
return context.getThread().executeTask(context, block,
new RubyThread.Task<Block, IRubyObject>() {
public IRubyObject run(ThreadContext context, Block block1) throws InterruptedException {
return block1.call(context);
}

public void wakeup(RubyThread thread, Block block1) {
thread.getNativeThread().interrupt();
}
});
} catch (InterruptedException e) {
throw context.runtime.newThreadError("interrupted in Concurrent::Synchronization::JRuby.sleep_interruptibly");
}
}
}
}
2 changes: 1 addition & 1 deletion lib/concurrent/atomic/count_down_latch.rb
Expand Up @@ -55,7 +55,7 @@ module Concurrent
# @!macro internal_implementation_note
CountDownLatchImplementation = case
when Concurrent.on_jruby?
MutexCountDownLatch
JavaCountDownLatch
else
MutexCountDownLatch
end
Expand Down
11 changes: 6 additions & 5 deletions lib/concurrent/atomic/java_count_down_latch.rb
Expand Up @@ -9,19 +9,20 @@ class JavaCountDownLatch

# @!macro count_down_latch_method_initialize
def initialize(count = 1)
unless count.is_a?(Fixnum) && count >= 0
raise ArgumentError.new('count must be in integer greater than or equal zero')
end
Utility::NativeInteger.ensure_integer_and_bounds(count)
Utility::NativeInteger.ensure_positive(count)
@latch = java.util.concurrent.CountDownLatch.new(count)
end

# @!macro count_down_latch_method_wait
def wait(timeout = nil)
if timeout.nil?
@latch.await
Synchronization::JRuby.sleep_interruptibly { @latch.await }
true
else
@latch.await(1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
Synchronization::JRuby.sleep_interruptibly do
@latch.await(1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
end
end
end

Expand Down
Expand Up @@ -831,7 +831,7 @@ def rebuild(table)
# no lock needed (or available) if bin >= 0, because we're not popping values from locked_indexes until we've run through the whole table
redo unless (bin >= 0 ? table.cas(i, nil, forwarder) : lock_and_clean_up_reverse_forwarders(table, old_table_size, new_table, i, forwarder))
elsif Node.locked_hash?(node_hash = node.hash)
locked_indexes ||= Array.new
locked_indexes ||= ::Array.new
if bin < 0 && locked_arr_idx > 0
locked_arr_idx -= 1
i, locked_indexes[locked_arr_idx] = locked_indexes[locked_arr_idx], i # swap with another bin
Expand Down
7 changes: 1 addition & 6 deletions lib/concurrent/configuration.rb
Expand Up @@ -175,13 +175,8 @@ def self.new_fast_executor(opts = {})
end

def self.new_io_executor(opts = {})
ThreadPoolExecutor.new(
min_threads: [2, Concurrent.processor_count].max,
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
# max_threads: 1000,
CachedThreadPool.new(
auto_terminate: opts.fetch(:auto_terminate, true),
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :abort # shouldn't matter -- 0 max queue
)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/concurrent/edge/promises.rb
Expand Up @@ -1797,8 +1797,8 @@ def process_on_blocker_resolution(future, index)

def on_resolvable(resolved_future, index)
all_fulfilled = true
values = Array.new(@Resolutions.size)
reasons = Array.new(@Resolutions.size)
values = ::Array.new(@Resolutions.size)
reasons = ::Array.new(@Resolutions.size)

@Resolutions.each_with_index do |internal_state, i|
fulfilled, values[i], reasons[i] = internal_state.result
Expand Down
8 changes: 5 additions & 3 deletions lib/concurrent/exchanger.rb
Expand Up @@ -315,9 +315,11 @@ def initialize
# @return [Object, CANCEL] the value exchanged by the other thread; {CANCEL} on timeout
def do_exchange(value, timeout)
if timeout.nil?
@exchanger.exchange(value)
Synchronization::JRuby.sleep_interruptibly { @exchanger.exchange(value) }
else
@exchanger.exchange(value, 1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
Synchronization::JRuby.sleep_interruptibly do
@exchanger.exchange(value, 1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
end
end
rescue java.util.concurrent.TimeoutException
CANCEL
Expand All @@ -329,7 +331,7 @@ def do_exchange(value, timeout)
# @!macro internal_implementation_note
ExchangerImplementation = case
when Concurrent.on_jruby?
RubyExchanger
JavaExchanger
else
RubyExchanger
end
Expand Down
19 changes: 6 additions & 13 deletions lib/concurrent/executor/cached_thread_pool.rb
Expand Up @@ -46,24 +46,17 @@ def initialize(opts = {})

private

if defined?(JavaThreadPoolExecutor) && self < JavaThreadPoolExecutor
# @!macro cached_thread_pool_method_initialize
# @!visibility private
def ns_initialize(opts)
super(opts)
# @!macro cached_thread_pool_method_initialize
# @!visibility private
def ns_initialize(opts)
super(opts)
if Concurrent.on_jruby?
@max_queue = 0
@executor = java.util.concurrent.Executors.newCachedThreadPool
@executor = java.util.concurrent.Executors.newCachedThreadPool
@executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new)
@executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS)
self.auto_terminate = opts.fetch(:auto_terminate, true)
end
else
# @!macro cached_thread_pool_method_initialize
# @!visibility private
def ns_initialize(opts)
super(opts)
end
end

end
end
17 changes: 10 additions & 7 deletions lib/concurrent/executor/java_thread_pool_executor.rb
Expand Up @@ -90,10 +90,10 @@ def running?
private

def ns_initialize(opts)
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
@fallback_policy = opts.fetch(:fallback_policy, :abort)

raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE
Expand All @@ -109,9 +109,12 @@ def ns_initialize(opts)
end

@executor = java.util.concurrent.ThreadPoolExecutor.new(
min_length, max_length,
idletime, java.util.concurrent.TimeUnit::SECONDS,
queue, FALLBACK_POLICY_CLASSES[@fallback_policy].new)
min_length,
max_length,
idletime,
java.util.concurrent.TimeUnit::SECONDS,
queue,
FALLBACK_POLICY_CLASSES[@fallback_policy].new)

self.auto_terminate = opts.fetch(:auto_terminate, true)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/executor/single_thread_executor.rb
Expand Up @@ -8,7 +8,7 @@ module Concurrent

SingleThreadExecutorImplementation = case
when Concurrent.on_jruby?
RubySingleThreadExecutor
JavaSingleThreadExecutor
else
RubySingleThreadExecutor
end
Expand Down
10 changes: 5 additions & 5 deletions lib/concurrent/executor/thread_pool_executor.rb
Expand Up @@ -9,7 +9,7 @@ module Concurrent

ThreadPoolExecutorImplementation = case
when Concurrent.on_jruby?
RubyThreadPoolExecutor
JavaThreadPoolExecutor
else
RubyThreadPoolExecutor
end
Expand Down Expand Up @@ -58,9 +58,9 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation
# @!macro [new] thread_pool_executor_method_initialize
#
# Create a new thread pool.
#
#
# @param [Hash] opts the options which configure the thread pool.
#
#
# @option opts [Integer] :max_threads (DEFAULT_MAX_POOL_SIZE) the maximum
# number of threads to be created
# @option opts [Integer] :min_threads (DEFAULT_MIN_POOL_SIZE) When a new task is submitted
Expand All @@ -73,12 +73,12 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation
# @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
# tasks that are received when the queue size has reached
# `max_queue` or the executor has shut down
#
#
# @raise [ArgumentError] if `:max_threads` is less than one
# @raise [ArgumentError] if `:min_threads` is less than zero
# @raise [ArgumentError] if `:fallback_policy` is not one of the values specified
# in `FALLBACK_POLICIES`
#
#
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html

# @!method initialize(opts = {})
Expand Down
5 changes: 2 additions & 3 deletions lib/concurrent/synchronization.rb
Expand Up @@ -7,15 +7,14 @@
require 'concurrent/synchronization/mri_object'
require 'concurrent/synchronization/jruby_object'
require 'concurrent/synchronization/rbx_object'
require 'concurrent/synchronization/truffle_object'
require 'concurrent/synchronization/truffleruby_object'
require 'concurrent/synchronization/object'
require 'concurrent/synchronization/volatile'

require 'concurrent/synchronization/abstract_lockable_object'
require 'concurrent/synchronization/mri_lockable_object'
require 'concurrent/synchronization/mutex_lockable_object'
require 'concurrent/synchronization/jruby_lockable_object'
require 'concurrent/synchronization/rbx_lockable_object'
require 'concurrent/synchronization/truffle_lockable_object'

require 'concurrent/synchronization/lockable_object'

Expand Down
8 changes: 4 additions & 4 deletions lib/concurrent/synchronization/lockable_object.rb
Expand Up @@ -5,18 +5,18 @@ module Synchronization
# @!macro internal_implementation_note
LockableObjectImplementation = case
when Concurrent.on_cruby? && Concurrent.ruby_version(:<=, 1, 9, 3)
MriMonitorLockableObject
MonitorLockableObject
when Concurrent.on_cruby? && Concurrent.ruby_version(:>, 1, 9, 3)
MriMutexLockableObject
MutexLockableObject
when Concurrent.on_jruby?
JRubyLockableObject
when Concurrent.on_rbx?
RbxLockableObject
when Concurrent.on_truffleruby?
MriMutexLockableObject
MutexLockableObject
else
warn 'Possibly unsupported Ruby implementation'
MriMonitorLockableObject
MonitorLockableObject
end
private_constant :LockableObjectImplementation

Expand Down

0 comments on commit 6ceda0b

Please sign in to comment.