Skip to content

Commit

Permalink
Add fork_worker option and refork command
Browse files Browse the repository at this point in the history
Trigger refork for improved copy-on-write performance.
  • Loading branch information
wjordan committed Feb 21, 2020
1 parent 6531e97 commit 873d089
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 13 deletions.
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -8,6 +8,7 @@
* `GC.compact` is called before fork if available (#2093)
* Add `requests_count` to workers stats. (#2106)
* Faster phased restart and worker timeout
* Add `fork_worker` option and `refork` command for improved copy-on-write performance (#2099)

* Bugfixes
* Your bugfix goes here (#Github Number)
Expand Down
5 changes: 5 additions & 0 deletions lib/puma/app/status.rb
Expand Up @@ -63,6 +63,11 @@ def call(env)
end

rack_response(200, backtraces.to_json)

when /\/refork$/
Process.kill "SIGURG", $$
rack_response(200, OK_STATUS)

else
rack_response 404, "Unsupported action", 'text/plain'
end
Expand Down
5 changes: 5 additions & 0 deletions lib/puma/cli.rb
Expand Up @@ -140,6 +140,11 @@ def setup_options
user_config.environment arg
end

o.on "-f", "--fork-worker",
"Fork new workers from existing worker. Cluster mode only" do
user_config.fork_worker
end

o.on "-I", "--include PATH", "Specify $LOAD_PATH directories" do |arg|
$LOAD_PATH.unshift(*arg.split(':'))
end
Expand Down
74 changes: 64 additions & 10 deletions lib/puma/cluster.rb
Expand Up @@ -77,6 +77,7 @@ def initialize(idx, pid, phase, options)
end

attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at
attr_writer :pid, :phase

def booted?
@stage == :booted
Expand Down Expand Up @@ -116,7 +117,7 @@ def term
@term ||= true
@first_term_sent ||= Time.now
end
Process.kill @signal, @pid
Process.kill @signal, @pid if @pid
rescue Errno::ESRCH
end
end
Expand All @@ -142,11 +143,11 @@ def spawn_workers
idx = next_worker_index
@launcher.config.run_hooks :before_worker_fork, idx

pid = fork { worker(idx, master) }
if !pid
log "! Complete inability to spawn new workers detected"
log "! Seppuku is the only choice."
exit! 1
if @options[:fork_worker] && idx != 0
@fork_writer << "#{idx}\n"
pid = nil
else
pid = spawn_worker(idx, master)
end

debug "Spawned worker: #{pid}"
Expand All @@ -156,6 +157,16 @@ def spawn_workers
end
end

def spawn_worker(idx, master)
pid = fork { worker(idx, master) }
if !pid
log "! Complete inability to spawn new workers detected"
log "! Seppuku is the only choice."
exit! 1
end
pid
end

def cull_workers
diff = @workers.size - @options[:workers]
return if diff < 1
Expand Down Expand Up @@ -233,8 +244,11 @@ def worker(index, master)
Signal.trap "SIGINT", "IGNORE"

@workers = []
@master_read.close
@suicide_pipe.close
if !@options[:fork_worker] || index == 0
@master_read.close
@suicide_pipe.close
@fork_writer.close
end

Thread.new do
Puma.set_thread_name "worker check pipe"
Expand All @@ -259,13 +273,29 @@ def worker(index, master)

server = start_server

if index == 0
Signal.trap "SIGCHLD" do
Process.wait(-1, Process::WNOHANG) rescue nil
wakeup!
end

Thread.new do
Puma.set_thread_name "worker fork pipe"
while (idx = @fork_pipe.gets)
idx = idx.to_i
pid = spawn_worker(idx, master)
@worker_write << "f#{pid}:#{idx}\n" rescue nil
end
end
end

Signal.trap "SIGTERM" do
@worker_write << "e#{Process.pid}\n" rescue nil
server.stop
end

begin
@worker_write << "b#{Process.pid}\n"
@worker_write << "b#{Process.pid}:#{index}\n"
rescue SystemCallError, IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
STDERR.puts "Master seems to have exited, exiting."
Expand Down Expand Up @@ -373,6 +403,13 @@ def preload?
# We do this in a separate method to keep the lambda scope
# of the signals handlers as small as possible.
def setup_signals
Signal.trap "SIGURG" do
if (worker = @workers.find {|w| w.index == 0})
worker.phase += 1
end
phased_restart
end

Signal.trap "SIGCHLD" do
wakeup!
end
Expand Down Expand Up @@ -456,6 +493,10 @@ def run
#
@check_pipe, @suicide_pipe = Puma::Util.pipe

# Separate pipe used by worker 0 to receive commands to
# fork new worker processes.
@fork_pipe, @fork_writer = Puma::Util.pipe

if daemon?
log "* Daemonizing..."
Process.daemon(true)
Expand Down Expand Up @@ -504,6 +545,12 @@ def run
result = read.gets
pid = result.to_i

if req == "b" || req == "f"
pid, idx = result.split(':').map(&:to_i)
w = @workers.find {|x| x.index == idx}
w.pid = pid if w.pid.nil?
end

if w = @workers.find { |x| x.pid == pid }
case req
when "b"
Expand All @@ -515,6 +562,7 @@ def run
w.instance_variable_set :@term, true
when "t"
w.term unless w.term?
@next_check = Time.now + 0.1
when "p"
w.ping!(result.sub(/^\d+/,'').chomp)
end
Expand Down Expand Up @@ -543,6 +591,7 @@ def run
# `#term` if needed
def wait_workers
@workers.reject! do |w|
next false if w.pid.nil?
begin
if Process.wait(w.pid, Process::WNOHANG)
true
Expand All @@ -551,7 +600,12 @@ def wait_workers
nil
end
rescue Errno::ECHILD
true # child is already terminated
begin
Process.kill(0, w.pid)
false # child still alive, but has another parent
rescue Errno::ESRCH, Errno::EPERM
true # child is already terminated
end
end
end
end
Expand Down
5 changes: 4 additions & 1 deletion lib/puma/control_cli.rb
Expand Up @@ -11,7 +11,7 @@
module Puma
class ControlCLI

COMMANDS = %w{halt restart phased-restart start stats status stop reload-worker-directory gc gc-stats thread-backtraces}
COMMANDS = %w{halt restart phased-restart start stats status stop reload-worker-directory gc gc-stats thread-backtraces refork}
PRINTABLE_COMMANDS = %w{gc-stats stats thread-backtraces}

def initialize(argv, stdout=STDOUT, stderr=STDERR)
Expand Down Expand Up @@ -232,6 +232,9 @@ def send_signal

return

when "refork"
Process.kill "SIGURG", @pid

else
return
end
Expand Down
10 changes: 10 additions & 0 deletions lib/puma/dsl.rb
Expand Up @@ -736,5 +736,15 @@ def set_remote_address(val=:socket)
end
end

# When set, new workers will be forked from worker 0 directly.
# This can be used to optimize copy-on-write performance for running apps.
#
# This also prevents phased restart from restarting worker 0, so
# phased restart can be used to improve memory usage on a running app.
#
# @note Cluster mode only.
def fork_worker(answer=true)
@options[:fork_worker] = answer
end
end
end
9 changes: 7 additions & 2 deletions test/test_integration_cluster.rb
Expand Up @@ -75,6 +75,11 @@ def test_usr1_all_respond_tcp
usr1_all_respond unix: false
end

def test_usr1_fork_worker
skip_unless_signal_exist? :USR1
usr1_all_respond config: '--fork-worker'
end

def test_usr1_all_respond_unix
skip_unless_signal_exist? :USR1
usr1_all_respond unix: true
Expand Down Expand Up @@ -239,8 +244,8 @@ def term_closes_listeners(unix: false)

# Send requests 1 per second. Send 1, then :USR1 server, then send another 24.
# All should be responded to, and at least three workers should be used
def usr1_all_respond(unix: false)
cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru", unix: unix
def usr1_all_respond(unix: false, config: '')
cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru #{config}", unix: unix
threads = []
replies = []
mutex = Mutex.new
Expand Down

0 comments on commit 873d089

Please sign in to comment.