Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment: Fork from existing worker process to optimize copy-on-write performance #2099

Merged
merged 3 commits into from May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions History.md
Expand Up @@ -10,6 +10,7 @@
* Increases maximum URI path length from 2048 to 8196 bytes (#2167)
* Force shutdown responses can be overridden by using the `lowlevel_error_handler` config (#2203)
* Faster phased restart and worker timeout (#2121)
* Add `fork_worker` option and `refork` command for improved copy-on-write performance (#2099)

* Deprecations, Removals and Breaking API Changes
* `Puma.stats` now returns a Hash instead of a JSON string (#2086)
Expand Down
31 changes: 31 additions & 0 deletions docs/fork_worker.md
@@ -0,0 +1,31 @@
# Fork-Worker Cluster Mode [Experimental]

Puma 5 introduces an experimental new cluster-mode configuration option, `fork_worker` (`--fork-worker` from the CLI). This mode causes Puma to fork additional workers from worker 0, instead of directly from the master process:

```
10000 \_ puma 4.3.3 (tcp://0.0.0.0:9292) [puma]
10001 \_ puma: cluster worker 0: 10000 [puma]
10002 \_ puma: cluster worker 1: 10000 [puma]
10003 \_ puma: cluster worker 2: 10000 [puma]
10004 \_ puma: cluster worker 3: 10000 [puma]
```

Similar to the `preload_app!` option, the `fork_worker` option allows your application to be initialized only once for copy-on-write memory savings, and it has two additional advantages:

1. **Compatible with phased restart.** Because the master process itself doesn't preload the application, this mode works with phased restart (`SIGUSR1` or `pumactl phased-restart`). When worker 0 reloads as part of a phased restart, it initializes a new copy of your application first, then the other workers reload by forking from this new worker already containing the new preloaded application.

This allows a phased restart to complete as quickly as a hot restart (`SIGUSR2` or `pumactl restart`), while still minimizing downtime by staggering the restart across cluster workers.

2. **'Refork' for additional copy-on-write improvements in running applications.** Fork-worker mode introduces a new `refork` command that re-loads all nonzero workers by re-forking them from worker 0.

This command can potentially improve memory utilization in large or complex applications that don't fully pre-initialize on startup, because the re-forked workers can share copy-on-write memory with a worker that has been running for a while and serving requests.

You can trigger a refork by sending the cluster the `SIGURG` signal or running the `pumactl refork` command at any time. A refork will also automatically trigger once, after a certain number of requests have been processed by worker 0 (default 1000). To configure the number of requests before the auto-refork, pass a positive integer argument to `fork_workers` (e.g., `fork_workers 1000`), or `0` to disable.

### Limitations

- This mode is still very experimental so there may be bugs or edge-cases, particularly around expected behavior of existing hooks. Please open a [bug report](https://github.com/puma/puma/issues/new?template=bug_report.md) if you encounter any issues.

- In order to fork new workers cleanly, worker 0 shuts down its server and stops serving requests so there are no open file descriptors or other kinds of shared global state between processes, and to maximize copy-on-write efficiency across the newly-forked workers. This may temporarily reduce total capacity of the cluster during a phased restart / refork.

In a cluster with `n` workers, a normal phased restart stops and restarts workers one by one while the application is loaded in each process, so `n-1` workers are available serving requests during the restart. In a phased restart in fork-worker mode, the application is first loaded in worker 0 while `n-1` workers are available, then worker 0 remains stopped while the rest of the workers are reloaded one by one, leaving only `n-2` workers to be available for a brief period of time. Reloading the rest of the workers should be quick because the application is preloaded at that point, but there may be situations where it can take longer (slow clients, long-running application code, slow worker-fork hooks, etc).
1 change: 1 addition & 0 deletions docs/signals.md
Expand Up @@ -41,6 +41,7 @@ Puma cluster responds to these signals:
- `HUP` reopen log files defined in stdout_redirect configuration parameter. If there is no stdout_redirect option provided it will behave like `INT`
- `INT` equivalent of sending Ctrl-C to cluster. Will attempt to finish then exit.
- `CHLD`
- `URG` refork workers in phases from worker 0, if `fork_workers` option is enabled.

## Callbacks order in case of different signals

Expand Down
5 changes: 5 additions & 0 deletions lib/puma/app/status.rb
Expand Up @@ -63,6 +63,11 @@ def call(env)
end

rack_response(200, backtraces.to_json)

when /\/refork$/
Process.kill "SIGURG", $$
rack_response(200, OK_STATUS)

else
rack_response 404, "Unsupported action", 'text/plain'
end
Expand Down
6 changes: 6 additions & 0 deletions lib/puma/cli.rb
Expand Up @@ -130,6 +130,12 @@ def setup_options
user_config.environment arg
end

o.on "-f", "--fork-worker=[REQUESTS]", OptionParser::DecimalInteger,
"Fork new workers from existing worker. Cluster mode only",
"Auto-refork after REQUESTS (default 1000)" do |*args|
user_config.fork_worker *args.compact
end

o.on "-I", "--include PATH", "Specify $LOAD_PATH directories" do |arg|
$LOAD_PATH.unshift(*arg.split(':'))
end
Expand Down
127 changes: 111 additions & 16 deletions lib/puma/cluster.rb
Expand Up @@ -37,7 +37,7 @@ def stop_workers
begin
loop do
wait_workers
break if @workers.empty?
break if @workers.reject {|w| w.pid.nil?}.empty?
sleep 0.2
end
rescue Interrupt
Expand Down Expand Up @@ -78,6 +78,7 @@ def initialize(idx, pid, phase, options)
end

attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at
attr_writer :pid, :phase

def booted?
@stage == :booted
Expand Down Expand Up @@ -113,7 +114,7 @@ def term
@term ||= true
@first_term_sent ||= Time.now
end
Process.kill @signal, @pid
Process.kill @signal, @pid if @pid
rescue Errno::ESRCH
end
end
Expand All @@ -134,23 +135,43 @@ def spawn_workers
return if diff < 1

master = Process.pid
if @options[:fork_worker]
@fork_writer << "-1\n"
end

diff.times do
idx = next_worker_index
@launcher.config.run_hooks :before_worker_fork, idx, @launcher.events

pid = fork { worker(idx, master) }
if !pid
log "! Complete inability to spawn new workers detected"
log "! Seppuku is the only choice."
exit! 1
if @options[:fork_worker] && idx != 0
@fork_writer << "#{idx}\n"
pid = nil
else
pid = spawn_worker(idx, master)
end

debug "Spawned worker: #{pid}"
@workers << Worker.new(idx, pid, @phase, @options)
end

@launcher.config.run_hooks :after_worker_fork, idx, @launcher.events
if @options[:fork_worker] &&
@workers.all? {|x| x.phase == @phase}

@fork_writer << "0\n"
end
end

def spawn_worker(idx, master)
@launcher.config.run_hooks :before_worker_fork, idx, @launcher.events

pid = fork { worker(idx, master) }
if !pid
log "! Complete inability to spawn new workers detected"
log "! Seppuku is the only choice."
exit! 1
end

@launcher.config.run_hooks :after_worker_fork, idx, @launcher.events
pid
end

def cull_workers
Expand Down Expand Up @@ -213,7 +234,6 @@ def check_workers

def wakeup!
return unless @wakeup
@next_check = Time.now

begin
@wakeup.write "!" unless @wakeup.closed?
Expand All @@ -229,9 +249,14 @@ def worker(index, master)

Signal.trap "SIGINT", "IGNORE"

fork_worker = @options[:fork_worker] && index == 0

@workers = []
@master_read.close
@suicide_pipe.close
if !@options[:fork_worker] || fork_worker
@master_read.close
@suicide_pipe.close
@fork_writer.close
end

Thread.new do
Puma.set_thread_name "worker check pipe"
Expand All @@ -254,15 +279,45 @@ def worker(index, master)
# things in shape before booting the app.
@launcher.config.run_hooks :before_worker_boot, index, @launcher.events

server = start_server
server = @server ||= start_server
restart_server = Queue.new << true << false

if fork_worker
restart_server.clear
Signal.trap "SIGCHLD" do
Process.wait(-1, Process::WNOHANG) rescue nil
wakeup!
end
nateberkopec marked this conversation as resolved.
Show resolved Hide resolved

Thread.new do
Puma.set_thread_name "worker fork pipe"
while (idx = @fork_pipe.gets)
idx = idx.to_i
if idx == -1 # stop server
if restart_server.length > 0
restart_server.clear
server.begin_restart(true)
@launcher.config.run_hooks :before_refork, nil, @launcher.events
GC.compact if GC.respond_to?(:compact)
end
elsif idx == 0 # restart server
restart_server << true << false
else # fork worker
pid = spawn_worker(idx, master)
@worker_write << "f#{pid}:#{idx}\n" rescue nil
end
end
end
end

Signal.trap "SIGTERM" do
@worker_write << "e#{Process.pid}\n" rescue nil
server.stop
restart_server << false
end

begin
@worker_write << "b#{Process.pid}\n"
@worker_write << "b#{Process.pid}:#{index}\n"
rescue SystemCallError, IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
STDERR.puts "Master seems to have exited, exiting."
Expand All @@ -283,7 +338,7 @@ def worker(index, master)
end
end

server.run.join
server.run.join while restart_server.pop

# Invoke any worker shutdown hooks so they can prevent the worker
# exiting until any background operations are completed
Expand Down Expand Up @@ -360,9 +415,31 @@ def preload?
@options[:preload_app]
end

def fork_worker!
if (worker = @workers.find { |w| w.index == 0 })
worker.phase += 1
end
phased_restart
end

# We do this in a separate method to keep the lambda scope
# of the signals handlers as small as possible.
def setup_signals
if @options[:fork_worker]
Signal.trap "SIGURG" do
fork_worker!
end

# Auto-fork after the specified number of requests.
if (fork_requests = @options[:fork_worker].to_i) > 0
@launcher.events.register(:ping!) do |w|
fork_worker! if w.index == 0 &&
w.phase == 0 &&
w.last_status[:requests_count] >= fork_requests
end
end
end

Signal.trap "SIGCHLD" do
wakeup!
end
Expand Down Expand Up @@ -446,6 +523,10 @@ def run
#
@check_pipe, @suicide_pipe = Puma::Util.pipe

# Separate pipe used by worker 0 to receive commands to
# fork new worker processes.
@fork_pipe, @fork_writer = Puma::Util.pipe

log "Use Ctrl-C to stop"

redirect_io
Expand Down Expand Up @@ -484,11 +565,18 @@ def run
if res
req = read.read_nonblock(1)

@next_check = Time.now if req == "!"
next if !req || req == "!"

result = read.gets
pid = result.to_i

if req == "b" || req == "f"
pid, idx = result.split(':').map(&:to_i)
w = @workers.find {|x| x.index == idx}
w.pid = pid if w.pid.nil?
end

if w = @workers.find { |x| x.pid == pid }
case req
when "b"
Expand All @@ -502,6 +590,7 @@ def run
w.term unless w.term?
when "p"
w.ping!(result.sub(/^\d+/,'').chomp)
@launcher.events.fire(:ping!, w)
end
else
log "! Out-of-sync worker list, no #{pid} worker"
Expand All @@ -528,6 +617,7 @@ def run
# `#term` if needed
def wait_workers
@workers.reject! do |w|
next false if w.pid.nil?
begin
if Process.wait(w.pid, Process::WNOHANG)
true
Expand All @@ -536,7 +626,12 @@ def wait_workers
nil
end
rescue Errno::ECHILD
true # child is already terminated
begin
Process.kill(0, w.pid)
false # child still alive, but has another parent
rescue Errno::ESRCH, Errno::EPERM
true # child is already terminated
end
end
end
end
Expand Down
5 changes: 4 additions & 1 deletion lib/puma/control_cli.rb
Expand Up @@ -11,7 +11,7 @@
module Puma
class ControlCLI

COMMANDS = %w{halt restart phased-restart start stats status stop reload-worker-directory gc gc-stats thread-backtraces}
COMMANDS = %w{halt restart phased-restart start stats status stop reload-worker-directory gc gc-stats thread-backtraces refork}
PRINTABLE_COMMANDS = %w{gc-stats stats thread-backtraces}

def initialize(argv, stdout=STDOUT, stderr=STDERR)
Expand Down Expand Up @@ -239,6 +239,9 @@ def send_signal

return

when "refork"
Process.kill "SIGURG", @pid

else
return
end
Expand Down
36 changes: 36 additions & 0 deletions lib/puma/dsl.rb
Expand Up @@ -492,6 +492,28 @@ def after_worker_fork(&block)

alias_method :after_worker_boot, :after_worker_fork

# When `fork_worker` is enabled, code to run in Worker 0
# before all other workers are re-forked from this process,
# after the server has temporarily stopped serving requests
# (once per complete refork cycle).
#
# This can be used to trigger extra garbage-collection to maximize
# copy-on-write efficiency, or close any connections to remote servers
# (database, Redis, ...) that were opened while the server was running.
#
# This can be called multiple times to add several hooks.
#
# @note Cluster mode with `fork_worker` enabled only.
# @example
# on_refork do
# 3.times {GC.start}
# end

def on_refork(&block)
@options[:before_refork] ||= []
@options[:before_refork] << block
end

# Code to run out-of-band when the worker is idle.
# These hooks run immediately after a request has finished
# processing and there are no busy threads on the worker.
Expand Down Expand Up @@ -705,5 +727,19 @@ def set_remote_address(val=:socket)
end
end

# When enabled, workers will be forked from worker 0 instead of from the master process.
# This option is similar to `preload_app` because the app is preloaded before forking,
# but it is compatible with phased restart.
#
# This option also enables the `refork` command (SIGURG), which optimizes copy-on-write performance
# in a running app.
#
# A refork will automatically trigger once after the specified number of requests
# (default 1000), or pass 0 to disable auto refork.
#
# @note Cluster mode only.
def fork_worker(after_requests=1000)
@options[:fork_worker] = Integer(after_requests)
end
end
end