From a290f1447c65605d239f4e751c8f5a27cfc990ec Mon Sep 17 00:00:00 2001 From: Will Jordan Date: Thu, 30 Apr 2020 00:01:51 -0700 Subject: [PATCH] Better error handling during force shutdown --- History.md | 1 + lib/puma/client.rb | 16 ++++----- lib/puma/server.rb | 75 ++++++++++++++++++++++++++-------------- lib/puma/thread_pool.rb | 23 ++++++++++-- test/test_puma_server.rb | 69 ++++++++++++++++++------------------ test/test_thread_pool.rb | 12 ++++--- 6 files changed, 121 insertions(+), 75 deletions(-) diff --git a/History.md b/History.md index e34abaa3a1..fc9e422baa 100644 --- a/History.md +++ b/History.md @@ -45,6 +45,7 @@ * Fix `UserFileDefaultOptions#fetch` to properly use `default` (#2233) * Improvements to `out_of_band` hook (#2234) * Prefer the rackup file specified by the CLI (#2225) + * Better error handling during force shutdown (#2271) * Refactor * Remove unused loader argument from Plugin initializer (#2095) diff --git a/lib/puma/client.rb b/lib/puma/client.rb index 324947b4b0..a412034e20 100644 --- a/lib/puma/client.rb +++ b/lib/puma/client.rb @@ -244,19 +244,17 @@ def eagerly_finish end # IS_JRUBY def finish(timeout) - return true if @ready + return if @ready until try_to_finish - can_read = begin - IO.select([@to_io], nil, nil, timeout) - rescue ThreadPool::ForceShutdown - nil - end - unless can_read - write_error(408) if in_data_phase + unless IO.select([@to_io], nil, nil, timeout) + timeout_error raise ConnectionError end end - true + end + + def timeout_error + write_error(408) if in_data_phase end def write_error(status_code) diff --git a/lib/puma/server.rb b/lib/puma/server.rb index 70095577eb..025e9425ee 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -88,6 +88,7 @@ def initialize(app, events=Events.stdio, options={}) @precheck_closing = true @requests_count = 0 + @queue_mutex = Mutex.new end attr_accessor :binder, :leak_stack_on_error, :early_hints @@ -194,12 +195,18 @@ def run(background=true) process_now = false begin - if @queue_requests - process_now = client.eagerly_finish - else - client.finish(@first_data_timeout) - process_now = true + unless @queue_requests && + !client.eagerly_finish && + queue_client(client, @first_data_timeout) + + @thread_pool.with_force_shutdown do + client.finish(@first_data_timeout) + end + process_now = process_client(client, buffer) end + rescue ThreadPool::ForceShutdown + client.timeout_error + client.close rescue MiniSSL::SSLError => e ssl_socket = client.io addr = ssl_socket.peeraddr.last @@ -215,13 +222,6 @@ def run(background=true) @events.parse_error self, client.env, e rescue ConnectionError, EOFError client.close - else - if process_now - process_client client, buffer - else - client.set_timeout @first_data_timeout - @reactor.add client - end end process_now @@ -262,7 +262,6 @@ def handle_servers check = @check sockets = [check] + @binder.ios pool = @thread_pool - queue_requests = @queue_requests remote_addr_value = nil remote_addr_header = nil @@ -306,7 +305,7 @@ def handle_servers @events.fire :state, @status - if queue_requests + if_queue_requests do @queue_requests = false @reactor.clear! @reactor.shutdown @@ -344,12 +343,33 @@ def handle_check return false end + # Runs the provided block if the Reactor queue is enabled. + # Returns block return value if queue is enabled, or nil if queue is not enabled. + def if_queue_requests + @queue_mutex.synchronize do + if @queue_requests + yield + end + end + end + private :if_queue_requests + + # Queues the provided client + def queue_client(client, timeout) + if_queue_requests do + client.set_timeout timeout + @reactor.add client + end + end + private :queue_client + # Given a connection on +client+, handle the incoming requests. # # This method support HTTP Keep-Alive so it may, depending on if the client # indicates that it supports keep alive, wait for another request before # returning. # + # Returns true if one or more complete requests were successfully handled. def process_client(client, buffer) begin @@ -359,7 +379,7 @@ def process_client(client, buffer) requests = 0 while true - case handle_request(client, buffer) + case handle_request(client, buffer).tap {requests += 1} when false return when :async @@ -370,8 +390,6 @@ def process_client(client, buffer) ThreadPool.clean_thread_locals if clean_thread_locals - requests += 1 - check_for_more_data = @status == :run if requests >= MAX_FAST_INLINE @@ -382,18 +400,19 @@ def process_client(client, buffer) check_for_more_data = false end - unless client.reset(check_for_more_data) - return unless @queue_requests - close_socket = false - client.set_timeout @persistent_timeout - @reactor.add client + next_request_ready = @thread_pool.with_force_shutdown do + client.reset(check_for_more_data) + end + + unless next_request_ready + close_socket = false if queue_client(client, @persistent_timeout) return end end end - # The client disconnected while we were reading data - rescue ConnectionError + # The client disconnected or threadpool shut down while we were reading data + rescue ConnectionError, ThreadPool::ForceShutdown # Swallow them. The ensure tries to close +client+ down # SSL handshake error @@ -435,6 +454,8 @@ def process_client(client, buffer) rescue StandardError => e @events.unknown_error self, e, "Client" end + + return requests.to_i > 0 end end @@ -571,7 +592,9 @@ def handle_request(req, lines) begin begin - status, headers, res_body = @app.call(env) + status, headers, res_body = @thread_pool.with_force_shutdown do + @app.call(env) + end return :async if req.hijacked @@ -869,7 +892,7 @@ def graceful_shutdown if @thread_pool if timeout = @options[:force_shutdown_after] - @thread_pool.shutdown timeout.to_i + @thread_pool.shutdown timeout.to_f else @thread_pool.shutdown end diff --git a/lib/puma/thread_pool.rb b/lib/puma/thread_pool.rb index c0bbedb1c8..d52595237d 100644 --- a/lib/puma/thread_pool.rb +++ b/lib/puma/thread_pool.rb @@ -62,6 +62,9 @@ def initialize(min, max, *extra, &block) end @clean_thread_locals = false + + @force_shutdown = false + @shutdown_mutex = Mutex.new end attr_reader :spawned, :trim_requested, :waiting @@ -315,6 +318,19 @@ def auto_reap!(timeout=5) @reaper.start! end + # Allows ThreadPool::ForceShutdown to be raised within the + # provided block if the thread is forced to shutdown during execution. + def with_force_shutdown + t = Thread.current + @shutdown_mutex.synchronize do + raise ForceShutdown if @force_shutdown + t[:with_force_shutdown] = true + end + yield + ensure + t[:with_force_shutdown] = false + end + # Tell all threads in the pool to exit and wait for them to finish. # Wait +timeout+ seconds then raise +ForceShutdown+ in remaining threads. # Next, wait an extra +grace+ seconds then force-kill remaining threads. @@ -349,8 +365,11 @@ def shutdown(timeout=-1) join.call(timeout) # If threads are still running, raise ForceShutdown and wait to finish. - threads.each do |t| - t.raise ForceShutdown + @shutdown_mutex.synchronize do + @force_shutdown = true + threads.each do |t| + t.raise ForceShutdown if t[:with_force_shutdown] + end end join.call(SHUTDOWN_GRACE_TIME) diff --git a/test/test_puma_server.rb b/test/test_puma_server.rb index 252525744d..0afb9bce58 100644 --- a/test/test_puma_server.rb +++ b/test/test_puma_server.rb @@ -881,27 +881,37 @@ def assert_does_not_allow_http_injection(app, opts = {}) end # Perform a server shutdown while requests are pending (one in app-server response, one still sending client request). - def shutdown_requests(app_delay: 2, request_delay: 1, post: false, response:, **options) + def shutdown_requests(s1_complete: true, s1_response: nil, post: false, s2_response: nil, **options) @server = Puma::Server.new @app, @events, options - server_run app: ->(_) { - sleep app_delay + mutex = Mutex.new + app_finished = ConditionVariable.new + server_run app: ->(env) { + path = env['REQUEST_PATH'] + mutex.synchronize do + app_finished.signal + app_finished.wait(mutex) if path == '/s1' + end [204, {}, []] } - s1 = send_http "GET / HTTP/1.1\r\n\r\n" + s1 = nil s2 = send_http post ? - "POST / HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhi!" : - "GET / HTTP/1.1\r\n" - sleep 0.1 - + "POST /s2 HTTP/1.1\r\nHost: test.com\r\nContent-Type: text/plain\r\nContent-Length: 5\r\n\r\nhi!" : + "GET /s2 HTTP/1.1\r\n" + mutex.synchronize do + s1 = send_http "GET /s1 HTTP/1.1\r\n\r\n" + app_finished.wait(mutex) + app_finished.signal if s1_complete + end @server.stop - sleep request_delay + Thread.pass until @server.instance_variable_get(:@thread_pool).instance_variable_get(:@shutdown) - s2 << "\r\n" + assert_match(s1_response, s1.gets) if s1_response - assert_match(/204/, s1.gets) + # Send s2 after shutdown begins + s2 << "\r\n" unless IO.select([s2], nil, nil, 0.1) - assert IO.select([s2], nil, nil, app_delay), 'timeout waiting for response' + assert IO.select([s2], nil, nil, 10), 'timeout waiting for response' s2_result = begin s2.gets rescue Errno::ECONNABORTED, Errno::ECONNRESET @@ -909,38 +919,29 @@ def shutdown_requests(app_delay: 2, request_delay: 1, post: false, response:, ** post ? '408' : nil end - if response - assert_match response, s2_result + if s2_response + assert_match s2_response, s2_result else assert_nil s2_result end + ensure + @server.stop(true) end - # Shutdown should allow pending requests to complete. + # Shutdown should allow pending requests and app-responses to complete. def test_shutdown_requests - shutdown_requests response: /204/ - shutdown_requests response: /204/, queue_requests: false - end - - # Requests stuck longer than `first_data_timeout` should have connection closed (408 w/pending POST body). - def test_shutdown_data_timeout - shutdown_requests request_delay: 3, first_data_timeout: 2, response: nil - shutdown_requests request_delay: 3, first_data_timeout: 2, response: nil, queue_requests: false - shutdown_requests request_delay: 3, first_data_timeout: 2, response: /408/, post: true + opts = {s1_response: /204/, s2_response: /204/} + shutdown_requests **opts + shutdown_requests **opts, queue_requests: false end # Requests still pending after `force_shutdown_after` should have connection closed (408 w/pending POST body). + # App-responses still pending should return 503 (uncaught Puma::ThreadPool::ForceShutdown exception). def test_force_shutdown - shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 3 - shutdown_requests request_delay: 4, response: nil, force_shutdown_after: 3, queue_requests: false - shutdown_requests request_delay: 4, response: /408/, force_shutdown_after: 3, post: true - end - - # App-responses still pending during `force_shutdown_after` should return 503 - # (uncaught Puma::ThreadPool::ForceShutdown exception). - def test_force_shutdown_app - shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 3 - shutdown_requests app_delay: 3, response: /503/, force_shutdown_after: 3, queue_requests: false + opts = {s1_complete: false, s1_response: /503/, s2_response: nil, force_shutdown_after: 0} + shutdown_requests **opts + shutdown_requests **opts, queue_requests: false + shutdown_requests **opts, post: true, s2_response: /408/ end def test_http11_connection_header_queue diff --git a/test/test_thread_pool.rb b/test/test_thread_pool.rb index edb753fa91..7cdc47a9fa 100644 --- a/test/test_thread_pool.rb +++ b/test/test_thread_pool.rb @@ -222,8 +222,10 @@ def test_force_shutdown_immediately pool = mutex_pool(0, 1) do begin - pool.signal - sleep + pool.with_force_shutdown do + pool.signal + sleep + end rescue Puma::ThreadPool::ForceShutdown rescued = true end @@ -248,8 +250,10 @@ def test_shutdown_with_grace rescued = [] pool = mutex_pool(2, 2) do begin - pool.signal - sleep + pool.with_force_shutdown do + pool.signal + sleep + end rescue Puma::ThreadPool::ForceShutdown rescued << Thread.current sleep