Skip to content

Commit

Permalink
Add fork_worker option for improved copy-on-write performance
Browse files Browse the repository at this point in the history
  • Loading branch information
wjordan committed Feb 13, 2020
1 parent 6f59df8 commit 52f40e9
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 12 deletions.
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -5,6 +5,7 @@
* Configuration: `environment` is read from `RAILS_ENV`, if `RACK_ENV` can't be found (#2022)
* `Puma.stats` now returns a Hash instead of a JSON string (#2086)
* `GC.compact` is called before fork if available (#2093)
* Add `fork_worker` option for improved copy-on-write performance (#2099)

* Bugfixes
* Your bugfix goes here (#Github Number)
Expand Down
5 changes: 5 additions & 0 deletions lib/puma/cli.rb
Expand Up @@ -140,6 +140,11 @@ def setup_options
user_config.environment arg
end

o.on "-f", "--fork-worker",
"Fork new workers from existing worker. Cluster mode only" do
user_config.fork_worker
end

o.on "-I", "--include PATH", "Specify $LOAD_PATH directories" do |arg|
$LOAD_PATH.unshift(*arg.split(':'))
end
Expand Down
58 changes: 48 additions & 10 deletions lib/puma/cluster.rb
Expand Up @@ -78,6 +78,7 @@ def initialize(idx, pid, phase, options)
end

attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at
attr_writer :pid

def booted?
@stage == :booted
Expand Down Expand Up @@ -139,11 +140,11 @@ def spawn_workers
idx = next_worker_index
@launcher.config.run_hooks :before_worker_fork, idx

pid = fork { worker(idx, master) }
if !pid
log "! Complete inability to spawn new workers detected"
log "! Seppuku is the only choice."
exit! 1
if @options[:fork_worker] && @workers.find {|x| x.index == 0}
@fork_writer << "#{idx}\n"
pid = nil
else
pid = spawn_worker(idx, master)
end

debug "Spawned worker: #{pid}"
Expand All @@ -157,6 +158,16 @@ def spawn_workers
end
end

def spawn_worker(idx, master)
pid = fork { worker(idx, master) }
if !pid
log "! Complete inability to spawn new workers detected"
log "! Seppuku is the only choice."
exit! 1
end
pid
end

def cull_workers
diff = @workers.size - @options[:workers]
return if diff < 1
Expand Down Expand Up @@ -212,7 +223,9 @@ def check_workers(force=false)
# we need to phase any workers out (which will restart
# in the right phase).
#
w = @workers.find { |x| x.phase != @phase }
w = @workers.
reject { |x| @options[:fork_worker] && x.index == 0 }.
find { |x| x.phase != @phase }

if w
if @phased_state == :idle
Expand Down Expand Up @@ -248,10 +261,19 @@ def worker(index, master)
@workers = []
@master_read.close
@suicide_pipe.close
@fork_writer.close

Thread.new do
Puma.set_thread_name "worker check pipe"
IO.select [@check_pipe]
if index == 0
while (idx = @fork_pipe.gets)
pid = spawn_worker(idx.to_i, master)
Process.detach(pid)
@worker_write << "f#{pid}:#{idx}" rescue nil
end
else
IO.select [@check_pipe]
end
log "! Detected parent died, dying"
exit! 1
end
Expand All @@ -278,7 +300,7 @@ def worker(index, master)
end

begin
@worker_write << "b#{Process.pid}\n"
@worker_write << "b#{Process.pid}:#{index}\n"
rescue SystemCallError, IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
STDERR.puts "Master seems to have exited, exiting."
Expand Down Expand Up @@ -321,7 +343,7 @@ def restart
end

def phased_restart
return false if @options[:preload_app]
return false if @options[:preload_app] && !@options[:fork_worker]

@phased_restart = true
wakeup!
Expand Down Expand Up @@ -468,6 +490,10 @@ def run
#
@check_pipe, @suicide_pipe = Puma::Util.pipe

# Separate pipe used by worker 0 to receive commands to
# fork new worker processes.
@fork_pipe, @fork_writer = Puma::Util.pipe

if daemon?
log "* Daemonizing..."
Process.daemon(true)
Expand Down Expand Up @@ -520,6 +546,12 @@ def run
result = read.gets
pid = result.to_i

if req == "b" || req == "f"
pid, idx = result.split(':').map(&:to_i)
w = @workers.find {|x| x.index == idx}
w.pid = pid if w.pid.nil?
end

if w = @workers.find { |x| x.pid == pid }
case req
when "b"
Expand Down Expand Up @@ -560,6 +592,7 @@ def run
# `#term` if needed
def wait_workers
@workers.reject! do |w|
return false if w.pid.nil?
begin
if Process.wait(w.pid, Process::WNOHANG)
true
Expand All @@ -568,7 +601,12 @@ def wait_workers
nil
end
rescue Errno::ECHILD
true # child is already terminated
begin
Process.kill(0, w.pid)
false # child still alive, but has another parent
rescue Errno::ESRCH, Errno::EPERM
true # child is already terminated
end
end
end
end
Expand Down
10 changes: 10 additions & 0 deletions lib/puma/dsl.rb
Expand Up @@ -736,5 +736,15 @@ def set_remote_address(val=:socket)
end
end

# When set, new workers will be forked from worker 0 directly.
# This can be used to optimize copy-on-write performance for running apps.
#
# This also prevents phased restart from restarting worker 0, so
# phased restart can be used to improve memory usage on a running app.
#
# @note Cluster mode only.
def fork_worker(answer=true)
@options[:fork_worker] = answer
end
end
end
9 changes: 7 additions & 2 deletions test/test_integration_cluster.rb
Expand Up @@ -75,6 +75,11 @@ def test_usr1_all_respond_tcp
usr1_all_respond unix: false
end

def test_usr1_fork_worker
skip_unless_signal_exist? :USR1
usr1_all_respond config: '--fork-worker'
end

def test_usr1_all_respond_unix
skip_unless_signal_exist? :USR1
usr1_all_respond unix: true
Expand Down Expand Up @@ -203,8 +208,8 @@ def term_closes_listeners(unix: false)

# Send requests 1 per second. Send 1, then :USR1 server, then send another 24.
# All should be responded to, and at least three workers should be used
def usr1_all_respond(unix: false)
cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru", unix: unix
def usr1_all_respond(unix: false, config: '')
cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru #{config}", unix: unix
threads = []
replies = []
mutex = Mutex.new
Expand Down

0 comments on commit 52f40e9

Please sign in to comment.