Skip to content

Commit

Permalink
Add Concurrent.usable_processor_count that is cgroups aware
Browse files Browse the repository at this point in the history
Closes: #1035

A running gag since the introduction of containerization is software
that starts one process per logical or physical core while running
inside a container with a restricted CPU quota and totally blowing
up memory usage in containerized environments.

The proper question to ask is how many CPU cores are usable, not how
many the machine has. To do that we have to read the cgroup info
from `/sys`. There is two way of doing it depending on the version
of cgroups used.

Co-Authored-By: usiegl00 <50933431+usiegl00@users.noreply.github.com>
  • Loading branch information
byroot and usiegl00 committed Feb 1, 2024
1 parent e9748af commit 165b061
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 0 deletions.
54 changes: 54 additions & 0 deletions lib/concurrent-ruby/concurrent/utility/processor_counter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class ProcessorCounter
def initialize
@processor_count = Delay.new { compute_processor_count }
@physical_processor_count = Delay.new { compute_physical_processor_count }
@cpu_quota = Delay.new { compute_cpu_quota }
end

def processor_count
Expand All @@ -21,6 +22,25 @@ def physical_processor_count
@physical_processor_count.value
end

def available_processor_count
cpu_count = processor_count.to_f
quota = cpu_quota

return cpu_count if quota.nil?

# cgroup cpus quotas have no limits, so they can be set to higher than the
# real count of cores.
if quota > cpu_count
cpu_count
else
quota
end
end

def cpu_quota
@cpu_quota.value
end

private

def compute_processor_count
Expand Down Expand Up @@ -60,6 +80,24 @@ def compute_physical_processor_count
rescue
return 1
end

def compute_cpu_quota
if RbConfig::CONFIG["target_os"].match(/linux/i)
if File.exist?("/sys/fs/cgroup/cpu.max")
# cgroups v2: https://docs.kernel.org/admin-guide/cgroup-v2.html#cpu-interface-files
cpu_max = File.read("/sys/fs/cgroup/cpu.max")
return nil if cpu_max.start_with?("max ") # no limit
max, period = cpu_max.split.map(&:to_f)
max / period
elsif File.exist?("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us")
# cgroups v1: https://kernel.googlesource.com/pub/scm/linux/kernel/git/glommer/memcg/+/cpu_stat/Documentation/cgroups/cpu.txt
max = File.read("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").to_i
return nil if max == 0
period = File.read("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_period_us").to_f
max / period
end
end
end
end
end

Expand Down Expand Up @@ -107,4 +145,20 @@ def self.processor_count
def self.physical_processor_count
processor_counter.physical_processor_count
end

# Number of processors usable for process scheduling.
# Returns `nil` if there is no quota, or a `Float` if the
# process is inside a cgroup with a dedicated CPU quota (typically Docker).
#
# For performance reasons the calculated value will be memoized on the first
# call.
#
# @return [nil, Float] number of usable processors seen by the OS or Java runtime
def self.available_processor_count
processor_counter.available_processor_count
end

def self.cpu_quota
processor_counter.cpu_quota
end
end
75 changes: 75 additions & 0 deletions spec/concurrent/utility/processor_count_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,79 @@ module Concurrent
expect(Concurrent::physical_processor_count).to be >= 1
end
end

RSpec.describe '#cpu_quota' do

let(:counter) { Concurrent::Utility::ProcessorCounter.new }

it 'returns #compute_cpu_quota' do
expect(Concurrent::cpu_quota).to be == counter.cpu_quota
end

it 'returns nil if no quota is detected' do
if RbConfig::CONFIG["target_os"].match(/linux/i)
expect(File).to receive(:exist?).twice.and_return(nil) # Checks for cgroups V1 and V2
end
expect(counter.cpu_quota).to be_nil
end

it 'returns nil if cgroups v2 sets no limit' do
expect(RbConfig::CONFIG).to receive(:[]).with("target_os").and_return("linux")
expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu.max").and_return(true)
expect(File).to receive(:read).with("/sys/fs/cgroup/cpu.max").and_return("max 100000\n")
expect(counter.cpu_quota).to be_nil
end

it 'returns a float if cgroups v2 sets a limit' do
expect(RbConfig::CONFIG).to receive(:[]).with("target_os").and_return("linux")
expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu.max").and_return(true)
expect(File).to receive(:read).with("/sys/fs/cgroup/cpu.max").and_return("150000 100000\n")
expect(counter.cpu_quota).to be == 1.5
end

it 'returns nil if cgroups v1 sets no limit' do
expect(RbConfig::CONFIG).to receive(:[]).with("target_os").and_return("linux")
expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu.max").and_return(false)
expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").and_return(true)

expect(File).to receive(:read).with("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").and_return("max\n")
expect(counter.cpu_quota).to be_nil
end

it 'returns a float if cgroups v1 sets a limit' do
expect(RbConfig::CONFIG).to receive(:[]).with("target_os").and_return("linux")
expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu.max").and_return(false)
expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").and_return(true)

expect(File).to receive(:read).with("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").and_return("150000\n")
expect(File).to receive(:read).with("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_period_us").and_return("100000\n")
expect(counter.cpu_quota).to be == 1.5
end

end

RSpec.describe '#available_processor_count' do

it 'returns #processor_count if #cpu_quota is nil' do
expect(Concurrent::processor_counter).to receive(:cpu_quota).and_return(nil)
available_processor_count = Concurrent::available_processor_count
expect(available_processor_count).to be == Concurrent::processor_count
expect(available_processor_count).to be_a Float
end

it 'returns #processor_count if #cpu_quota is higher' do
expect(Concurrent::processor_counter).to receive(:cpu_quota).and_return(Concurrent::processor_count.to_f * 2)
available_processor_count = Concurrent::available_processor_count
expect(available_processor_count).to be == Concurrent::processor_count
expect(available_processor_count).to be_a Float
end

it 'returns #cpu_quota if #cpu_quota is lower than #processor_count' do
expect(Concurrent::processor_counter).to receive(:cpu_quota).and_return(Concurrent::processor_count.to_f / 2)
available_processor_count = Concurrent::available_processor_count
expect(available_processor_count).to be == Concurrent::processor_count.to_f / 2
expect(available_processor_count).to be_a Float
end

end
end

0 comments on commit 165b061

Please sign in to comment.