Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #717: Reenable java implementations, fix global IO executor on JRuby, bunch of other bug fixes #727

Merged
merged 14 commits into from Jul 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Gemfile
Expand Up @@ -10,8 +10,9 @@ group :development do
gem 'rake-compiler-dock', '~> 0.6.0'
gem 'gem-compiler', '~> 0.3.0'
gem 'benchmark-ips', '~> 2.7'
end

# documentation
group :documentation do
gem 'countloc', '~> 0.4.0', :platforms => :mri, :require => false
# TODO (pitr-ch 04-May-2018): update to remove: [DEPRECATION] `last_comment` is deprecated. Please use `last_description` instead.
gem 'yard', '~> 0.8.0', :require => false
Expand Down
7 changes: 2 additions & 5 deletions concurrent-ruby-edge.gemspec
@@ -1,8 +1,5 @@
$:.push File.join(File.dirname(__FILE__), 'lib')
$:.push File.join(File.dirname(__FILE__), 'support')

require 'concurrent/version'
require 'file_map'
require_relative 'lib/concurrent/version'
require_relative 'support/file_map'

Gem::Specification.new do |s|
git_files = `git ls-files`.split("\n")
Expand Down
4 changes: 1 addition & 3 deletions concurrent-ruby-ext.gemspec
@@ -1,6 +1,4 @@
$:.push File.join(File.dirname(__FILE__), 'lib')

require 'concurrent/version'
require_relative 'lib/concurrent/version'

Gem::Specification.new do |s|
s.name = 'concurrent-ruby-ext'
Expand Down
29 changes: 11 additions & 18 deletions concurrent-ruby.gemspec
@@ -1,33 +1,26 @@
$:.push File.join(File.dirname(__FILE__), 'lib')
$:.push File.join(File.dirname(__FILE__), 'support')

require 'concurrent/version'
require 'file_map'
require_relative 'lib/concurrent/version'
require_relative 'lib/concurrent/utility/engine'
require_relative 'support/file_map'

Gem::Specification.new do |s|
git_files = `git ls-files`.split("\n")

s.name = 'concurrent-ruby'
s.version = Concurrent::VERSION
s.platform = Gem::Platform::RUBY
s.platform = Concurrent.on_jruby? ? Gem::Platform::JAVA : Gem::Platform::RUBY
s.authors = ["Jerry D'Antonio", 'Petr Chalupa', 'The Ruby Concurrency Team']
s.email = 'concurrent-ruby@googlegroups.com'
s.homepage = 'http://www.concurrent-ruby.com'
s.summary = 'Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.'
s.license = 'MIT'
s.date = Time.now.strftime('%Y-%m-%d')
s.files = FileMap::MAP.fetch(:core)
s.files = [*FileMap::MAP.fetch(:core),
*FileMap::MAP.fetch(:spec),
*(Dir['lib/**/*.jar'] if Concurrent.on_jruby?)]
s.extra_rdoc_files = Dir['README*', 'LICENSE*', 'CHANGELOG*']
s.require_paths = ['lib']
s.description = <<-EOF
Modern concurrency tools including agents, futures, promises, thread pools, actors, supervisors, and more.
Inspired by Erlang, Clojure, Go, JavaScript, actors, and classic concurrency patterns.
EOF

if defined?(JRUBY_VERSION)
s.files += Dir['lib/**/*.jar']
s.platform = 'java'
end
s.description = <<-TXT.gsub(/^ +/, '')
Modern concurrency tools including agents, futures, promises, thread pools, actors, supervisors, and more.
Inspired by Erlang, Clojure, Go, JavaScript, actors, and classic concurrency patterns.
TXT

s.required_ruby_version = '>= 1.9.3'
end
34 changes: 34 additions & 0 deletions ext/com/concurrent_ruby/ext/SynchronizationLibrary.java
Expand Up @@ -5,6 +5,7 @@
import org.jruby.RubyClass;
import org.jruby.RubyModule;
import org.jruby.RubyObject;
import org.jruby.RubyThread;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
Expand Down Expand Up @@ -96,6 +97,14 @@ public void load(Ruby runtime, boolean wrap) throws IOException {

defineClass(runtime, synchronizationModule, "AbstractLockableObject", "JRubyLockableObject",
JRubyLockableObject.class, JRUBY_LOCKABLE_OBJECT_ALLOCATOR);

defineClass(runtime, synchronizationModule, "Object", "JRuby",
JRuby.class, new ObjectAllocator() {
@Override
public IRubyObject allocate(Ruby runtime, RubyClass klazz) {
return new JRuby(runtime, klazz);
}
});
}

private RubyClass defineClass(
Expand Down Expand Up @@ -267,4 +276,29 @@ public IRubyObject nsBroadcast(ThreadContext context) {
return this;
}
}

@JRubyClass(name = "JRuby")
public static class JRuby extends RubyObject {
public JRuby(Ruby runtime, RubyClass metaClass) {
super(runtime, metaClass);
}

@JRubyMethod(name = "sleep_interruptibly", visibility = Visibility.PUBLIC, module = true)
public static IRubyObject sleepInterruptibly(ThreadContext context, IRubyObject receiver, Block block) {
try {
return context.getThread().executeTask(context, block,
new RubyThread.Task<Block, IRubyObject>() {
public IRubyObject run(ThreadContext context, Block block1) throws InterruptedException {
return block1.call(context);
}

public void wakeup(RubyThread thread, Block block1) {
thread.getNativeThread().interrupt();
}
});
} catch (InterruptedException e) {
throw context.runtime.newThreadError("interrupted in Concurrent::Synchronization::JRuby.sleep_interruptibly");
}
}
}
}
2 changes: 1 addition & 1 deletion lib/concurrent/atomic/count_down_latch.rb
Expand Up @@ -55,7 +55,7 @@ module Concurrent
# @!macro internal_implementation_note
CountDownLatchImplementation = case
when Concurrent.on_jruby?
MutexCountDownLatch
JavaCountDownLatch
else
MutexCountDownLatch
end
Expand Down
11 changes: 6 additions & 5 deletions lib/concurrent/atomic/java_count_down_latch.rb
Expand Up @@ -9,19 +9,20 @@ class JavaCountDownLatch

# @!macro count_down_latch_method_initialize
def initialize(count = 1)
unless count.is_a?(Fixnum) && count >= 0
raise ArgumentError.new('count must be in integer greater than or equal zero')
end
Utility::NativeInteger.ensure_integer_and_bounds(count)
Utility::NativeInteger.ensure_positive(count)
@latch = java.util.concurrent.CountDownLatch.new(count)
end

# @!macro count_down_latch_method_wait
def wait(timeout = nil)
if timeout.nil?
@latch.await
Synchronization::JRuby.sleep_interruptibly { @latch.await }
true
else
@latch.await(1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
Synchronization::JRuby.sleep_interruptibly do
@latch.await(1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
end
end
end

Expand Down
Expand Up @@ -831,7 +831,7 @@ def rebuild(table)
# no lock needed (or available) if bin >= 0, because we're not popping values from locked_indexes until we've run through the whole table
redo unless (bin >= 0 ? table.cas(i, nil, forwarder) : lock_and_clean_up_reverse_forwarders(table, old_table_size, new_table, i, forwarder))
elsif Node.locked_hash?(node_hash = node.hash)
locked_indexes ||= Array.new
locked_indexes ||= ::Array.new
if bin < 0 && locked_arr_idx > 0
locked_arr_idx -= 1
i, locked_indexes[locked_arr_idx] = locked_indexes[locked_arr_idx], i # swap with another bin
Expand Down
7 changes: 1 addition & 6 deletions lib/concurrent/configuration.rb
Expand Up @@ -175,13 +175,8 @@ def self.new_fast_executor(opts = {})
end

def self.new_io_executor(opts = {})
ThreadPoolExecutor.new(
min_threads: [2, Concurrent.processor_count].max,
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
# max_threads: 1000,
CachedThreadPool.new(
auto_terminate: opts.fetch(:auto_terminate, true),
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :abort # shouldn't matter -- 0 max queue
)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/concurrent/edge/promises.rb
Expand Up @@ -1797,8 +1797,8 @@ def process_on_blocker_resolution(future, index)

def on_resolvable(resolved_future, index)
all_fulfilled = true
values = Array.new(@Resolutions.size)
reasons = Array.new(@Resolutions.size)
values = ::Array.new(@Resolutions.size)
reasons = ::Array.new(@Resolutions.size)

@Resolutions.each_with_index do |internal_state, i|
fulfilled, values[i], reasons[i] = internal_state.result
Expand Down
8 changes: 5 additions & 3 deletions lib/concurrent/exchanger.rb
Expand Up @@ -315,9 +315,11 @@ def initialize
# @return [Object, CANCEL] the value exchanged by the other thread; {CANCEL} on timeout
def do_exchange(value, timeout)
if timeout.nil?
@exchanger.exchange(value)
Synchronization::JRuby.sleep_interruptibly { @exchanger.exchange(value) }
else
@exchanger.exchange(value, 1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
Synchronization::JRuby.sleep_interruptibly do
@exchanger.exchange(value, 1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
end
end
rescue java.util.concurrent.TimeoutException
CANCEL
Expand All @@ -329,7 +331,7 @@ def do_exchange(value, timeout)
# @!macro internal_implementation_note
ExchangerImplementation = case
when Concurrent.on_jruby?
RubyExchanger
JavaExchanger
else
RubyExchanger
end
Expand Down
19 changes: 6 additions & 13 deletions lib/concurrent/executor/cached_thread_pool.rb
Expand Up @@ -46,24 +46,17 @@ def initialize(opts = {})

private

if defined?(JavaThreadPoolExecutor) && self < JavaThreadPoolExecutor
# @!macro cached_thread_pool_method_initialize
# @!visibility private
def ns_initialize(opts)
super(opts)
# @!macro cached_thread_pool_method_initialize
# @!visibility private
def ns_initialize(opts)
super(opts)
if Concurrent.on_jruby?
@max_queue = 0
@executor = java.util.concurrent.Executors.newCachedThreadPool
@executor = java.util.concurrent.Executors.newCachedThreadPool
@executor.setRejectedExecutionHandler(FALLBACK_POLICY_CLASSES[@fallback_policy].new)
@executor.setKeepAliveTime(opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT), java.util.concurrent.TimeUnit::SECONDS)
self.auto_terminate = opts.fetch(:auto_terminate, true)
end
else
# @!macro cached_thread_pool_method_initialize
# @!visibility private
def ns_initialize(opts)
super(opts)
end
end

end
end
17 changes: 10 additions & 7 deletions lib/concurrent/executor/java_thread_pool_executor.rb
Expand Up @@ -90,10 +90,10 @@ def running?
private

def ns_initialize(opts)
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
@fallback_policy = opts.fetch(:fallback_policy, :abort)

raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE
Expand All @@ -109,9 +109,12 @@ def ns_initialize(opts)
end

@executor = java.util.concurrent.ThreadPoolExecutor.new(
min_length, max_length,
idletime, java.util.concurrent.TimeUnit::SECONDS,
queue, FALLBACK_POLICY_CLASSES[@fallback_policy].new)
min_length,
max_length,
idletime,
java.util.concurrent.TimeUnit::SECONDS,
queue,
FALLBACK_POLICY_CLASSES[@fallback_policy].new)

self.auto_terminate = opts.fetch(:auto_terminate, true)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/concurrent/executor/single_thread_executor.rb
Expand Up @@ -8,7 +8,7 @@ module Concurrent

SingleThreadExecutorImplementation = case
when Concurrent.on_jruby?
RubySingleThreadExecutor
JavaSingleThreadExecutor
else
RubySingleThreadExecutor
end
Expand Down
10 changes: 5 additions & 5 deletions lib/concurrent/executor/thread_pool_executor.rb
Expand Up @@ -9,7 +9,7 @@ module Concurrent

ThreadPoolExecutorImplementation = case
when Concurrent.on_jruby?
RubyThreadPoolExecutor
JavaThreadPoolExecutor
else
RubyThreadPoolExecutor
end
Expand Down Expand Up @@ -58,9 +58,9 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation
# @!macro [new] thread_pool_executor_method_initialize
#
# Create a new thread pool.
#
#
# @param [Hash] opts the options which configure the thread pool.
#
#
# @option opts [Integer] :max_threads (DEFAULT_MAX_POOL_SIZE) the maximum
# number of threads to be created
# @option opts [Integer] :min_threads (DEFAULT_MIN_POOL_SIZE) When a new task is submitted
Expand All @@ -73,12 +73,12 @@ class ThreadPoolExecutor < ThreadPoolExecutorImplementation
# @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
# tasks that are received when the queue size has reached
# `max_queue` or the executor has shut down
#
#
# @raise [ArgumentError] if `:max_threads` is less than one
# @raise [ArgumentError] if `:min_threads` is less than zero
# @raise [ArgumentError] if `:fallback_policy` is not one of the values specified
# in `FALLBACK_POLICIES`
#
#
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html

# @!method initialize(opts = {})
Expand Down
5 changes: 2 additions & 3 deletions lib/concurrent/synchronization.rb
Expand Up @@ -7,15 +7,14 @@
require 'concurrent/synchronization/mri_object'
require 'concurrent/synchronization/jruby_object'
require 'concurrent/synchronization/rbx_object'
require 'concurrent/synchronization/truffle_object'
require 'concurrent/synchronization/truffleruby_object'
require 'concurrent/synchronization/object'
require 'concurrent/synchronization/volatile'

require 'concurrent/synchronization/abstract_lockable_object'
require 'concurrent/synchronization/mri_lockable_object'
require 'concurrent/synchronization/mutex_lockable_object'
require 'concurrent/synchronization/jruby_lockable_object'
require 'concurrent/synchronization/rbx_lockable_object'
require 'concurrent/synchronization/truffle_lockable_object'

require 'concurrent/synchronization/lockable_object'

Expand Down
8 changes: 4 additions & 4 deletions lib/concurrent/synchronization/lockable_object.rb
Expand Up @@ -5,18 +5,18 @@ module Synchronization
# @!macro internal_implementation_note
LockableObjectImplementation = case
when Concurrent.on_cruby? && Concurrent.ruby_version(:<=, 1, 9, 3)
MriMonitorLockableObject
MonitorLockableObject
when Concurrent.on_cruby? && Concurrent.ruby_version(:>, 1, 9, 3)
MriMutexLockableObject
MutexLockableObject
when Concurrent.on_jruby?
JRubyLockableObject
when Concurrent.on_rbx?
RbxLockableObject
when Concurrent.on_truffleruby?
MriMutexLockableObject
MutexLockableObject
else
warn 'Possibly unsupported Ruby implementation'
MriMonitorLockableObject
MonitorLockableObject
end
private_constant :LockableObjectImplementation

Expand Down