Skip to content

Commit

Permalink
Extract calls to purge_interrupt_queue (#2716)
Browse files Browse the repository at this point in the history
* Extract calls to purge_interrupt_queue

Move calls to Thread#purge_interrupt_queue to a module function. This
means if/when this pattern needs to change it can change in one place
instead of a dozen or more places.

* Update comment on purge_interrupt_queue [ci skip]
  • Loading branch information
jacobherrington committed Oct 1, 2021
1 parent cf991f6 commit 20dc923
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion lib/puma/client.rb
Expand Up @@ -162,7 +162,7 @@ def close
begin
@io.close
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/puma/cluster/worker.rb
Expand Up @@ -106,7 +106,7 @@ def run
begin
@worker_write << "b#{Process.pid}:#{index}\n"
rescue SystemCallError, IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
STDERR.puts "Master seems to have exited, exiting."
return
end
Expand All @@ -127,7 +127,7 @@ def run
payload = %Q!#{base_payload}{ "backlog":#{b}, "running":#{r}, "pool_capacity":#{t}, "max_threads": #{m}, "requests_count": #{rc} }\n!
io << payload
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
break
end
sleep Const::WORKER_CHECK_INTERVAL
Expand Down
2 changes: 1 addition & 1 deletion lib/puma/minissl.rb
Expand Up @@ -169,7 +169,7 @@ def close
end
end
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
# nothing
ensure
@socket.close
Expand Down
2 changes: 1 addition & 1 deletion lib/puma/runner.rb
Expand Up @@ -24,7 +24,7 @@ def wakeup!
@wakeup.write "!" unless @wakeup.closed?

rescue SystemCallError, IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
end

def development?
Expand Down
12 changes: 6 additions & 6 deletions lib/puma/server.rb
Expand Up @@ -146,7 +146,7 @@ def cork_socket(socket)
begin
skt.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 1) if skt.kind_of? TCPSocket
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
end
end

Expand All @@ -155,7 +155,7 @@ def uncork_socket(socket)
begin
skt.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 0) if skt.kind_of? TCPSocket
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
end
end
else
Expand All @@ -176,7 +176,7 @@ def closed_socket?(socket)
begin
tcp_info = skt.getsockopt(Socket::IPPROTO_TCP, Socket::TCP_INFO)
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
@precheck_closing = false
false
else
Expand Down Expand Up @@ -491,7 +491,7 @@ def process_client(client, buffer)
begin
client.close if close_socket
rescue IOError, SystemCallError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
# Already closed
rescue StandardError => e
@events.unknown_error e, nil, "Client"
Expand Down Expand Up @@ -583,11 +583,11 @@ def notify_safely(message)
@notify << message
rescue IOError, NoMethodError, Errno::EPIPE
# The server, in another thread, is shutting down
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
rescue RuntimeError => e
# Temporary workaround for https://bugs.ruby-lang.org/issues/13239
if e.message.include?('IOError')
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
else
raise e
end
Expand Down
7 changes: 7 additions & 0 deletions lib/puma/util.rb
Expand Up @@ -10,6 +10,13 @@ def pipe
IO.pipe
end

# An instance method on Thread has been provided to address https://bugs.ruby-lang.org/issues/13632,
# which currently effects some older versions of Ruby: 2.2.7 2.2.8 2.2.9 2.2.10 2.3.4 2.4.1
# Additional context: https://github.com/puma/puma/pull/1345
def purge_interrupt_queue
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
end

# Unescapes a URI escaped string with +encoding+. +encoding+ will be the
# target encoding of the string returned, and it defaults to UTF-8
if defined?(::Encoding)
Expand Down
2 changes: 1 addition & 1 deletion test/test_busy_worker.rb
Expand Up @@ -17,7 +17,7 @@ def teardown
def new_connection
TCPSocket.new('127.0.0.1', @port).tap {|s| @ios << s}
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
retry
end

Expand Down
2 changes: 1 addition & 1 deletion test/test_out_of_band_server.rb
Expand Up @@ -21,7 +21,7 @@ def teardown
def new_connection
TCPSocket.new('127.0.0.1', @port).tap {|s| @ios << s}
rescue IOError
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
Puma::Util.purge_interrupt_queue
retry
end

Expand Down

0 comments on commit 20dc923

Please sign in to comment.