From e5fe1dc0f7e1b6a469dee8b3eacae310062349d2 Mon Sep 17 00:00:00 2001 From: Will Jordan Date: Fri, 1 May 2020 15:44:58 -0700 Subject: [PATCH] Add fork_worker option and refork command Trigger refork for improved copy-on-write performance. --- History.md | 1 + docs/fork_worker.md | 31 ++++++++ docs/signals.md | 1 + lib/puma/app/status.rb | 5 ++ lib/puma/cli.rb | 6 ++ lib/puma/cluster.rb | 127 +++++++++++++++++++++++++++---- lib/puma/control_cli.rb | 5 +- lib/puma/dsl.rb | 14 ++++ lib/puma/server.rb | 4 +- test/test_integration_cluster.rb | 9 ++- 10 files changed, 183 insertions(+), 20 deletions(-) create mode 100644 docs/fork_worker.md diff --git a/History.md b/History.md index 8067a8e12e..596d0387b8 100644 --- a/History.md +++ b/History.md @@ -10,6 +10,7 @@ * Increases maximum URI path length from 2048 to 8196 bytes (#2167) * Force shutdown responses can be overridden by using the `lowlevel_error_handler` config (#2203) * Faster phased restart and worker timeout (#2121) + * Add `fork_worker` option and `refork` command for improved copy-on-write performance (#2099) * Deprecations, Removals and Breaking API Changes * `Puma.stats` now returns a Hash instead of a JSON string (#2086) diff --git a/docs/fork_worker.md b/docs/fork_worker.md new file mode 100644 index 0000000000..8782f9d679 --- /dev/null +++ b/docs/fork_worker.md @@ -0,0 +1,31 @@ +# Fork-Worker Cluster Mode [Experimental] + +Puma 5 introduces an experimental new cluster-mode configuration option, `fork_worker` (`--fork-worker` from the CLI). This mode causes Puma to fork additional workers from worker 0, instead of directly from the master process: + +``` +10000 \_ puma 4.3.3 (tcp://0.0.0.0:9292) [puma] +10001 \_ puma: cluster worker 0: 10000 [puma] +10002 \_ puma: cluster worker 1: 10000 [puma] +10003 \_ puma: cluster worker 2: 10000 [puma] +10004 \_ puma: cluster worker 3: 10000 [puma] +``` + +Similar to the `preload_app!` option, the `fork_worker` option allows your application to be initialized only once for copy-on-write memory savings, and it has two additional advantages: + +1. **Compatible with phased restart.** Because the master process itself doesn't preload the application, this mode works with phased restart (`SIGUSR1` or `pumactl phased-restart`). When worker 0 reloads as part of a phased restart, it initializes a new copy of your application first, then the other workers reload by forking from this new worker already containing the new preloaded application. + + This allows a phased restart to complete as quickly as a hot restart (`SIGUSR2` or `pumactl restart`), while still minimizing downtime by staggering the restart across cluster workers. + +2. **'Refork' for additional copy-on-write improvements in running applications.** Fork-worker mode introduces a new `refork` command that re-loads all nonzero workers by re-forking them from worker 0. + + This command can potentially improve memory utilization in large or complex applications that don't fully pre-initialize on startup, because the re-forked workers can share copy-on-write memory with a worker that has been running for a while and serving requests. + + You can trigger a refork by sending the cluster the `SIGURG` signal or running the `pumactl refork` command at any time. A refork will also automatically trigger once, after a certain number of requests have been processed by worker 0 (default 1000). To configure the number of requests before the auto-refork, pass a positive integer argument to `fork_workers` (e.g., `fork_workers 1000`), or `0` to disable. + +### Limitations + +- This mode is still very experimental so there may be bugs or edge-cases, particularly around expected behavior of existing hooks. Please open a [bug report](https://github.com/puma/puma/issues/new?template=bug_report.md) if you encounter any issues. + +- In order to fork new workers cleanly, worker 0 shuts down its server and stops serving requests so there are no open file descriptors or other kinds of shared global state between processes, and to maximize copy-on-write efficiency across the newly-forked workers. This may temporarily reduce total capacity of the cluster during a phased restart / refork. + + In a cluster with `n` workers, a normal phased restart stops and restarts workers one by one while the application is loaded in each process, so `n-1` workers are available serving requests during the restart. In a phased restart in fork-worker mode, the application is first loaded in worker 0 while `n-1` workers are available, then worker 0 remains stopped while the rest of the workers are reloaded one by one, leaving only `n-2` workers to be available for a brief period of time. Reloading the rest of the workers should be quick because the application is preloaded at that point, but there may be situations where it can take longer (slow clients, long-running application code, slow worker-fork hooks, etc). \ No newline at end of file diff --git a/docs/signals.md b/docs/signals.md index 3625c72bda..9661aa5956 100644 --- a/docs/signals.md +++ b/docs/signals.md @@ -41,6 +41,7 @@ Puma cluster responds to these signals: - `HUP` reopen log files defined in stdout_redirect configuration parameter. If there is no stdout_redirect option provided it will behave like `INT` - `INT` equivalent of sending Ctrl-C to cluster. Will attempt to finish then exit. - `CHLD` +- `URG` refork workers in phases from worker 0, if `fork_workers` option is enabled. ## Callbacks order in case of different signals diff --git a/lib/puma/app/status.rb b/lib/puma/app/status.rb index 3a6518ee78..efffc43cff 100644 --- a/lib/puma/app/status.rb +++ b/lib/puma/app/status.rb @@ -63,6 +63,11 @@ def call(env) end rack_response(200, backtraces.to_json) + + when /\/refork$/ + Process.kill "SIGURG", $$ + rack_response(200, OK_STATUS) + else rack_response 404, "Unsupported action", 'text/plain' end diff --git a/lib/puma/cli.rb b/lib/puma/cli.rb index 45c421b1bc..c4aac2be62 100644 --- a/lib/puma/cli.rb +++ b/lib/puma/cli.rb @@ -130,6 +130,12 @@ def setup_options user_config.environment arg end + o.on "-f", "--fork-worker=[REQUESTS]", OptionParser::DecimalInteger, + "Fork new workers from existing worker. Cluster mode only", + "Auto-refork after REQUESTS (default 1000)" do |*args| + user_config.fork_worker *args.compact + end + o.on "-I", "--include PATH", "Specify $LOAD_PATH directories" do |arg| $LOAD_PATH.unshift(*arg.split(':')) end diff --git a/lib/puma/cluster.rb b/lib/puma/cluster.rb index 7a3d3f7fa7..7e78d97a67 100644 --- a/lib/puma/cluster.rb +++ b/lib/puma/cluster.rb @@ -37,7 +37,7 @@ def stop_workers begin loop do wait_workers - break if @workers.empty? + break if @workers.reject {|w| w.pid.nil?}.empty? sleep 0.2 end rescue Interrupt @@ -78,6 +78,7 @@ def initialize(idx, pid, phase, options) end attr_reader :index, :pid, :phase, :signal, :last_checkin, :last_status, :started_at + attr_writer :pid, :phase def booted? @stage == :booted @@ -113,7 +114,7 @@ def term @term ||= true @first_term_sent ||= Time.now end - Process.kill @signal, @pid + Process.kill @signal, @pid if @pid rescue Errno::ESRCH end end @@ -134,23 +135,43 @@ def spawn_workers return if diff < 1 master = Process.pid + if @options[:fork_worker] + @fork_writer << "-1\n" + end diff.times do idx = next_worker_index - @launcher.config.run_hooks :before_worker_fork, idx, @launcher.events - pid = fork { worker(idx, master) } - if !pid - log "! Complete inability to spawn new workers detected" - log "! Seppuku is the only choice." - exit! 1 + if @options[:fork_worker] && idx != 0 + @fork_writer << "#{idx}\n" + pid = nil + else + pid = spawn_worker(idx, master) end debug "Spawned worker: #{pid}" @workers << Worker.new(idx, pid, @phase, @options) + end - @launcher.config.run_hooks :after_worker_fork, idx, @launcher.events + if @options[:fork_worker] && + @workers.all? {|x| x.phase == @phase} + + @fork_writer << "0\n" + end + end + + def spawn_worker(idx, master) + @launcher.config.run_hooks :before_worker_fork, idx, @launcher.events + + pid = fork { worker(idx, master) } + if !pid + log "! Complete inability to spawn new workers detected" + log "! Seppuku is the only choice." + exit! 1 end + + @launcher.config.run_hooks :after_worker_fork, idx, @launcher.events + pid end def cull_workers @@ -213,7 +234,6 @@ def check_workers def wakeup! return unless @wakeup - @next_check = Time.now begin @wakeup.write "!" unless @wakeup.closed? @@ -229,9 +249,14 @@ def worker(index, master) Signal.trap "SIGINT", "IGNORE" + fork_worker = @options[:fork_worker] && index == 0 + @workers = [] - @master_read.close - @suicide_pipe.close + if !@options[:fork_worker] || fork_worker + @master_read.close + @suicide_pipe.close + @fork_writer.close + end Thread.new do Puma.set_thread_name "worker check pipe" @@ -254,15 +279,45 @@ def worker(index, master) # things in shape before booting the app. @launcher.config.run_hooks :before_worker_boot, index, @launcher.events - server = start_server + server = @server ||= start_server + restart_server = Queue.new << true << false + + if fork_worker + restart_server.clear + Signal.trap "SIGCHLD" do + Process.wait(-1, Process::WNOHANG) rescue nil + wakeup! + end + + Thread.new do + Puma.set_thread_name "worker fork pipe" + while (idx = @fork_pipe.gets) + idx = idx.to_i + if idx == -1 # stop server + if restart_server.length > 0 + restart_server.clear + server.begin_restart(true) + @launcher.config.run_hooks :before_refork, nil, @launcher.events + GC.compact if GC.respond_to?(:compact) + end + elsif idx == 0 # restart server + restart_server << true << false + else # fork worker + pid = spawn_worker(idx, master) + @worker_write << "f#{pid}:#{idx}\n" rescue nil + end + end + end + end Signal.trap "SIGTERM" do @worker_write << "e#{Process.pid}\n" rescue nil server.stop + restart_server << false end begin - @worker_write << "b#{Process.pid}\n" + @worker_write << "b#{Process.pid}:#{index}\n" rescue SystemCallError, IOError Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue STDERR.puts "Master seems to have exited, exiting." @@ -283,7 +338,7 @@ def worker(index, master) end end - server.run.join + server.run.join while restart_server.pop # Invoke any worker shutdown hooks so they can prevent the worker # exiting until any background operations are completed @@ -360,9 +415,31 @@ def preload? @options[:preload_app] end + def fork_worker! + if (worker = @workers.find { |w| w.index == 0 }) + worker.phase += 1 + end + phased_restart + end + # We do this in a separate method to keep the lambda scope # of the signals handlers as small as possible. def setup_signals + if @options[:fork_worker] + Signal.trap "SIGURG" do + fork_worker! + end + + # Auto-fork after the specified number of requests. + if (fork_requests = @options[:fork_worker].to_i) > 0 + @launcher.events.register(:ping!) do |w| + fork_worker! if w.index == 0 && + w.phase == 0 && + w.last_status[:requests_count] >= fork_requests + end + end + end + Signal.trap "SIGCHLD" do wakeup! end @@ -446,6 +523,10 @@ def run # @check_pipe, @suicide_pipe = Puma::Util.pipe + # Separate pipe used by worker 0 to receive commands to + # fork new worker processes. + @fork_pipe, @fork_writer = Puma::Util.pipe + log "Use Ctrl-C to stop" redirect_io @@ -484,11 +565,18 @@ def run if res req = read.read_nonblock(1) + @next_check = Time.now if req == "!" next if !req || req == "!" result = read.gets pid = result.to_i + if req == "b" || req == "f" + pid, idx = result.split(':').map(&:to_i) + w = @workers.find {|x| x.index == idx} + w.pid = pid if w.pid.nil? + end + if w = @workers.find { |x| x.pid == pid } case req when "b" @@ -502,6 +590,7 @@ def run w.term unless w.term? when "p" w.ping!(result.sub(/^\d+/,'').chomp) + @launcher.events.fire(:ping!, w) end else log "! Out-of-sync worker list, no #{pid} worker" @@ -528,6 +617,7 @@ def run # `#term` if needed def wait_workers @workers.reject! do |w| + next false if w.pid.nil? begin if Process.wait(w.pid, Process::WNOHANG) true @@ -536,7 +626,12 @@ def wait_workers nil end rescue Errno::ECHILD - true # child is already terminated + begin + Process.kill(0, w.pid) + false # child still alive, but has another parent + rescue Errno::ESRCH, Errno::EPERM + true # child is already terminated + end end end end diff --git a/lib/puma/control_cli.rb b/lib/puma/control_cli.rb index 063b74e188..54a5de1f6b 100644 --- a/lib/puma/control_cli.rb +++ b/lib/puma/control_cli.rb @@ -11,7 +11,7 @@ module Puma class ControlCLI - COMMANDS = %w{halt restart phased-restart start stats status stop reload-worker-directory gc gc-stats thread-backtraces} + COMMANDS = %w{halt restart phased-restart start stats status stop reload-worker-directory gc gc-stats thread-backtraces refork} PRINTABLE_COMMANDS = %w{gc-stats stats thread-backtraces} def initialize(argv, stdout=STDOUT, stderr=STDERR) @@ -239,6 +239,9 @@ def send_signal return + when "refork" + Process.kill "SIGURG", @pid + else return end diff --git a/lib/puma/dsl.rb b/lib/puma/dsl.rb index 41e03a1a7e..9dd3c2d475 100644 --- a/lib/puma/dsl.rb +++ b/lib/puma/dsl.rb @@ -705,5 +705,19 @@ def set_remote_address(val=:socket) end end + # When enabled, workers will be forked from worker 0 instead of from the master process. + # This option is similar to `preload_app` because the app is preloaded before forking, + # but it is compatible with phased restart. + # + # This option also enables the `refork` command (SIGURG), which optimizes copy-on-write performance + # in a running app. + # + # A refork will automatically trigger once after the specified number of requests + # (default 1000), or pass 0 to disable auto refork. + # + # @note Cluster mode only. + def fork_worker(after_requests=1000) + @options[:fork_worker] = Integer(after_requests) + end end end diff --git a/lib/puma/server.rb b/lib/puma/server.rb index a467e23715..73cd9f74c1 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -258,6 +258,7 @@ def run(background=true) end def handle_servers + @check, @notify = Puma::Util.pipe if @notify.closed? begin check = @check sockets = [check] + @binder.ios @@ -910,8 +911,9 @@ def halt(sync=false) @thread.join if @thread && sync end - def begin_restart + def begin_restart(sync=false) notify_safely(RESTART_COMMAND) + @thread.join if @thread && sync end def fast_write(io, str) diff --git a/test/test_integration_cluster.rb b/test/test_integration_cluster.rb index d16dff55bb..79335a97fd 100644 --- a/test/test_integration_cluster.rb +++ b/test/test_integration_cluster.rb @@ -75,6 +75,11 @@ def test_usr1_all_respond_tcp usr1_all_respond unix: false end + def test_usr1_fork_worker + skip_unless_signal_exist? :USR1 + usr1_all_respond config: '--fork-worker' + end + def test_usr1_all_respond_unix skip_unless_signal_exist? :USR1 usr1_all_respond unix: true @@ -239,8 +244,8 @@ def term_closes_listeners(unix: false) # Send requests 1 per second. Send 1, then :USR1 server, then send another 24. # All should be responded to, and at least three workers should be used - def usr1_all_respond(unix: false) - cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru", unix: unix + def usr1_all_respond(unix: false, config: '') + cli_server "-w #{WORKERS} -t 0:5 -q test/rackup/sleep_pid.ru #{config}", unix: unix threads = [] replies = [] mutex = Mutex.new