Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of thread pool prestart for CRuby #1031

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb
Expand Up @@ -144,6 +144,8 @@ module Concurrent
# subsequent tasks will be rejected in accordance with the configured `fallback_policy`.
# * `auto_terminate`: When true (default), the threads started will be marked as daemon.
# * `fallback_policy`: The policy defining how rejected tasks are handled.
# * `prestart`: Defaults to false. When true, the minimum number of threads
# will be started when the pool is created.
#
# Three fallback policies are supported:
#
Expand Down
Expand Up @@ -119,6 +119,14 @@ def prune_pool
synchronize { ns_prune_pool }
end

def prestartCoreThread
ns_add_idle_worker
end

def prestartAllCoreThreads
@min_length.times { ns_add_idle_worker }
end

private

# @!visibility private
Expand Down Expand Up @@ -149,6 +157,10 @@ def ns_initialize(opts)

@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
@next_gc_time = Concurrent.monotonic_time + @gc_interval

if opts.fetch(:prestart, false)
prestartAllCoreThreads
end
end

# @!visibility private
Expand Down Expand Up @@ -247,6 +259,20 @@ def ns_add_busy_worker
worker
end

# creates new worker which is ready
# @return [nil, Worker] nil of max capacity is reached
#
# @!visibility private
def ns_add_idle_worker
return if @pool.size >= @max_length

@workers_counter += 1
@pool << (worker = Worker.new(self, @workers_counter))
@largest_length = @pool.length if @pool.length > @largest_length
@ready << [worker, Concurrent.monotonic_time]
worker
end

# handle ready worker, giving it new job or assigning back to @ready
#
# @!visibility private
Expand Down
21 changes: 21 additions & 0 deletions spec/concurrent/executor/fixed_thread_pool_spec.rb
Expand Up @@ -177,6 +177,19 @@ module Concurrent
end
end

context 'prestart' do

it 'starts threads when prestart is called' do
pool = described_class.new(5, prestart: false)
expect(pool.length).to eq 0
pool.prestartAllCoreThreads
expect(pool.length).to eq 5
pool.shutdown
expect(pool.wait_for_termination(pool_termination_timeout)).to eq true
end

end

context 'worker creation and caching' do

it 'never creates more than :num_threads threads' do
Expand All @@ -189,6 +202,14 @@ module Concurrent
pool.shutdown
expect(pool.wait_for_termination(pool_termination_timeout)).to eq true
end

it 'creates threads on creation if :prestart is true' do
pool = described_class.new(5, prestart: true)
expect(pool.length).to eq 5
pool.shutdown
expect(pool.wait_for_termination(pool_termination_timeout)).to eq true
end

end

context 'fallback policy' do
Expand Down