From 76efeab569b9392ff2e5fb88de8520b5a17de17c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Burgos=20Maci=C3=A1?= Date: Fri, 12 Jul 2019 15:31:48 -0400 Subject: [PATCH 1/9] Creates on_before_rack & on_after_rack hooks --- lib/puma/plugin.rb | 4 +++ lib/puma/reactor.rb | 16 +++++++++--- lib/puma/server.rb | 47 +++++++++++++++++++++++++++++++++++ lib/puma/stream_client.rb | 52 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 116 insertions(+), 3 deletions(-) create mode 100644 lib/puma/stream_client.rb diff --git a/lib/puma/plugin.rb b/lib/puma/plugin.rb index 571cff098f..c04bfc42cd 100644 --- a/lib/puma/plugin.rb +++ b/lib/puma/plugin.rb @@ -57,6 +57,10 @@ def find(name) raise UnknownPlugin, "file failed to register a plugin" end + def each + @plugins.each_value { |plugin| yield plugin } + end + def add_background(blk) @background << blk end diff --git a/lib/puma/reactor.rb b/lib/puma/reactor.rb index 0d419cfd0c..a7c35693ba 100644 --- a/lib/puma/reactor.rb +++ b/lib/puma/reactor.rb @@ -216,9 +216,19 @@ def run_internal end begin - if c.try_to_finish - @app_pool << c - clear_monitor mon + if c.respond_to?(:stream?) && c.stream? + if c.read_more + @app_pool << c + end + + if c.closed? + clear_monitor mon + end + else + if c.try_to_finish + @app_pool << c + clear_monitor mon + end end # Don't report these to the lowlevel_error handler, otherwise diff --git a/lib/puma/server.rb b/lib/puma/server.rb index ac1e28674a..e97cff89e6 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -85,6 +85,19 @@ def initialize(app, events=Events.stdio, options={}) @mode = :http @precheck_closing = true + + @on_before_rack = nil + @on_after_rack = nil + Plugins.each do |plugin| + if plugin.respond_to? :on_before_rack + @on_before_rack ||= [] + @on_before_rack << plugin.method(:on_before_rack) + end + if plugin.respond_to? :on_after_rack + @on_after_rack ||= [] + @on_after_rack << plugin.method(:on_after_rack) + end + end end attr_accessor :binder, :leak_stack_on_error, :early_hints @@ -296,6 +309,12 @@ def run(background=true) @max_threads, IOBuffer) do |client, buffer| + if client.respond_to? :churn + more_to_churn = client.churn + @thread_pool << client if more_to_churn + next + end + # Advertise this server into the thread Thread.current[ThreadLocalKey] = self @@ -652,6 +671,10 @@ def handle_request(req, lines) # after_reply = env[RACK_AFTER_REPLY] = [] + unless @on_before_rack.nil? + @on_before_rack.each { |hook| hook.call(env) } + end + begin begin status, headers, res_body = @app.call(env) @@ -681,6 +704,30 @@ def handle_request(req, lines) status, headers, res_body = lowlevel_error(e, env) end + unless @on_after_rack.nil? + async_handler = nil + + @on_after_rack.reverse_each do |hook| + stream_client = hook.call(env, headers, req.io) + + if stream_client.respond_to?(:stream?) && stream_client.stream? + if async_handler.nil? + if stream_client.read_more + @thread_pool << stream_client + end + + @reactor.add stream_client + + async_handler = stream_client + else + raise "Only one #on_after_rack hook can take the socket over" + end + end + end + + return :async if async_handler + end + content_length = nil no_body = head diff --git a/lib/puma/stream_client.rb b/lib/puma/stream_client.rb new file mode 100644 index 0000000000..dc3fc74771 --- /dev/null +++ b/lib/puma/stream_client.rb @@ -0,0 +1,52 @@ +module Puma + # This serves as a base class for any plugin that wants to take over the + # socket and wait on IO. + # + # The subclasses are expected to implement these two methods: `#read_more` & + # `#churn`. + # + # The underlying socket is available through `@io`. + class StreamClient + def initialize(io) + @io = io + end + + def to_io + @io + end + + def stream? + true + end + + def timeout_at + false + end + + def close + @io.close + end + + def closed? + @io.closed? + end + + # This method will be invoked when the IO descriptor has new data pending + # to be read. You can read from `@io` at this time. + # + # You can return a truthy value to add this client to the thread pool. + def read_more + raise NotImplementedError + end + + # This is the method that the thread pool will be consuming. A "churn" is + # enqueued on the thread pool each time `#churn` or `#read_more` return a + # truthy value. + # + # You can return a truthy value to add this client to the thread pool + # again. + def churn + raise NotImplementedError + end + end +end From e8c8d685d01ca1723e77bfc7fb583ea0b5f664ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Burgos=20Maci=C3=A1?= Date: Mon, 15 Jul 2019 19:00:46 -0400 Subject: [PATCH 2/9] Trying to have a stable interface... --- lib/puma/client.rb | 4 ++++ lib/puma/reactor.rb | 30 ++++++++++++++++-------------- lib/puma/server.rb | 17 +++++++++-------- lib/puma/stream_client.rb | 33 ++++++++++++++++++++++----------- 4 files changed, 51 insertions(+), 33 deletions(-) diff --git a/lib/puma/client.rb b/lib/puma/client.rb index 0aa4425e5a..8564ad8495 100644 --- a/lib/puma/client.rb +++ b/lib/puma/client.rb @@ -141,6 +141,10 @@ def close end end + def on_shutdown + close + end + # The object used for a request with no body. All requests with # no body share this one object since it has no state. EmptyBody = NullIO.new diff --git a/lib/puma/reactor.rb b/lib/puma/reactor.rb index a7c35693ba..20f7cdb890 100644 --- a/lib/puma/reactor.rb +++ b/lib/puma/reactor.rb @@ -189,7 +189,7 @@ def run_internal if submon.value == @ready false else - submon.value.close + submon.value.on_shutdown begin selector.deregister submon.value rescue IOError @@ -215,20 +215,22 @@ def run_internal end end - begin - if c.respond_to?(:stream?) && c.stream? - if c.read_more - @app_pool << c - end + if c.is_a? StreamClient + if c.on_read_ready + @app_pool << c + end - if c.closed? - clear_monitor mon - end - else - if c.try_to_finish - @app_pool << c - clear_monitor mon - end + if c.closed? + clear_monitor mon + end + + next + end + + begin + if c.try_to_finish + @app_pool << c + clear_monitor mon end # Don't report these to the lowlevel_error handler, otherwise diff --git a/lib/puma/server.rb b/lib/puma/server.rb index e97cff89e6..b2249f8092 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -312,6 +312,7 @@ def run(background=true) if client.respond_to? :churn more_to_churn = client.churn @thread_pool << client if more_to_churn + next end @@ -705,27 +706,27 @@ def handle_request(req, lines) end unless @on_after_rack.nil? - async_handler = nil + is_async = false @on_after_rack.reverse_each do |hook| stream_client = hook.call(env, headers, req.io) - if stream_client.respond_to?(:stream?) && stream_client.stream? - if async_handler.nil? - if stream_client.read_more + if stream_client.is_a? StreamClient + if is_async + raise "Only one #on_after_rack hook should take over" + else + if stream_client.on_read_ready @thread_pool << stream_client end @reactor.add stream_client - async_handler = stream_client - else - raise "Only one #on_after_rack hook can take the socket over" + is_async = true end end end - return :async if async_handler + return :async if is_async end content_length = nil diff --git a/lib/puma/stream_client.rb b/lib/puma/stream_client.rb index dc3fc74771..228151d13e 100644 --- a/lib/puma/stream_client.rb +++ b/lib/puma/stream_client.rb @@ -2,8 +2,12 @@ module Puma # This serves as a base class for any plugin that wants to take over the # socket and wait on IO. # - # The subclasses are expected to implement these two methods: `#read_more` & - # `#churn`. + # The subclasses are expected to implement: + # + # 1. Methods to respond to changes in the underlying socket. These are + # `#on_read_ready`, `#on_shutdown` & `#on_broken_pipe`. + # 2. A `#churn` method that runs within the thread pool, this is what can + # be used to invoke app's logic. # # The underlying socket is available through `@io`. class StreamClient @@ -15,18 +19,10 @@ def to_io @io end - def stream? - true - end - def timeout_at false end - def close - @io.close - end - def closed? @io.closed? end @@ -35,7 +31,22 @@ def closed? # to be read. You can read from `@io` at this time. # # You can return a truthy value to add this client to the thread pool. - def read_more + def on_read_ready + raise NotImplementedError + end + + # This method will be invoked when the underlying connection is broken + # for whatever reason (client timeout, abrupt client disconnection, etc.). + # + # You can return a truthy value to add this client to the thread pool. + def on_broken_pipe + raise NotImplementedError + end + + # This method will be invoked when the server is being stopped. + # + # It's not possible to run more work on the thread pool at this stage. + def on_shutdown raise NotImplementedError end From bb25c5a895398d49450bdfbf8bbabdca05607d81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Burgos=20Maci=C3=A1?= Date: Tue, 16 Jul 2019 17:31:02 -0400 Subject: [PATCH 3/9] Slightly better doc --- lib/puma/stream_client.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/puma/stream_client.rb b/lib/puma/stream_client.rb index 228151d13e..5add5bb773 100644 --- a/lib/puma/stream_client.rb +++ b/lib/puma/stream_client.rb @@ -5,11 +5,13 @@ module Puma # The subclasses are expected to implement: # # 1. Methods to respond to changes in the underlying socket. These are - # `#on_read_ready`, `#on_shutdown` & `#on_broken_pipe`. + # `#on_read_ready`, `#on_broken_pipe` & `#on_shutdown`. # 2. A `#churn` method that runs within the thread pool, this is what can # be used to invoke app's logic. # # The underlying socket is available through `@io`. + # + # The other methods should never be overriden. class StreamClient def initialize(io) @io = io From bde424390179ce44ad7a10c977da63c6ba67fc28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Burgos=20Maci=C3=A1?= Date: Tue, 16 Jul 2019 18:10:17 -0400 Subject: [PATCH 4/9] Fit all text in 80 cols --- docs/plugins.md | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/docs/plugins.md b/docs/plugins.md index 3e0b531e47..ad02e78d75 100644 --- a/docs/plugins.md +++ b/docs/plugins.md @@ -1,15 +1,22 @@ ## Plugins -Puma 3.0 added support for plugins that can augment configuration and service operations. +Puma 3.0 added support for plugins that can augment configuration and service +operations. 2 canonical plugins to look to aid in development of further plugins: -* [tmp\_restart](https://github.com/puma/puma/blob/master/lib/puma/plugin/tmp_restart.rb): Restarts the server if the file `tmp/restart.txt` is touched -* [heroku](https://github.com/puma/puma-heroku/blob/master/lib/puma/plugin/heroku.rb): Packages up the default configuration used by puma on Heroku +* [tmp\_restart](https://github.com/puma/puma/blob/master/lib/puma/plugin/tmp_restart.rb): + Restarts the server if the file `tmp/restart.txt` is touched +* [heroku](https://github.com/puma/puma-heroku/blob/master/lib/puma/plugin/heroku.rb): + Packages up the default configuration used by puma on Heroku -Plugins are activated in a puma configuration file (such as `config/puma.rb'`) by adding `plugin "name"`, such as `plugin "heroku"`. +Plugins are activated in a puma configuration file (such as `config/puma.rb'`) +by adding `plugin "name"`, such as `plugin "heroku"`. -Plugins are activated based simply on path requirements so, activating the `heroku` plugin will simply be doing `require "puma/plugin/heroku"`. This allows gems to provide multiple plugins (as well as unrelated gems to provide puma plugins). +Plugins are activated based simply on path requirements so, activating the +`heroku` plugin will simply be doing `require "puma/plugin/heroku"`. This +allows gems to provide multiple plugins (as well as unrelated gems to provide +puma plugins). The `tmp_restart` plugin is bundled with puma, so it can always be used. @@ -19,10 +26,11 @@ To use the `heroku` plugin, add `puma-heroku` to your Gemfile or install it. At present, there are 2 hooks that plugins can use: `start` and `config`. -`start` runs when the server has started and allows the plugin to start other functionality to augment puma. +`start` runs when the server has started and allows the plugin to start other +functionality to augment puma. -`config` runs when the server is being configured and is passed a `Puma::DSL` object that can be used to add additional configuration. +`config` runs when the server is being configured and is passed a `Puma::DSL` +object that can be used to add additional configuration. -Any public methods in `Puma::Plugin` are the public API that any plugin may use. - -In the future, more hooks and APIs will be added. +Any public methods in `Puma::Plugin` are the public API that any plugin may +use. From 17e72051f6b9700db34bd8a3f81e3d13c594cd65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Burgos=20Maci=C3=A1?= Date: Tue, 16 Jul 2019 18:34:41 -0400 Subject: [PATCH 5/9] Adds a bit of docs on the new hooks for plugins --- docs/plugins.md | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/docs/plugins.md b/docs/plugins.md index ad02e78d75..36fd5239f4 100644 --- a/docs/plugins.md +++ b/docs/plugins.md @@ -24,7 +24,9 @@ To use the `heroku` plugin, add `puma-heroku` to your Gemfile or install it. ### API -At present, there are 2 hooks that plugins can use: `start` and `config`. +## Server-wide hooks + +Plugins can use a couple of hooks at server level: `start` and `config`. `start` runs when the server has started and allows the plugin to start other functionality to augment puma. @@ -34,3 +36,17 @@ object that can be used to add additional configuration. Any public methods in `Puma::Plugin` are the public API that any plugin may use. + +## Per request hooks + +`#on_before_rack(env)` will be called right before the Rack application is +invoked. The called hook may modify `env` just like any Rack middleware. + +`#on_after_rack(env, headers, io)` will be called after the Rack application +has completed its execution and before any response content is written to the +client. A plugin may take over from here by returning an instance of a +`Puma::StreamClient` descendant. Check out `lib/puma/stream_client.rb` to know +more about this interface. + +If more than one plugin arises interest in taking over, an exception will +be happen and Puma will serve a 500. From f0fac281b8233df466bb5284d1d3cab55721cb9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Burgos=20Maci=C3=A1?= Date: Wed, 17 Jul 2019 09:19:40 -0400 Subject: [PATCH 6/9] Avoid having to define StreamClient in reactor --- docs/plugins.md | 6 +++--- lib/puma/client.rb | 4 ++++ lib/puma/reactor.rb | 2 +- lib/puma/server.rb | 2 +- lib/puma/stream_client.rb | 4 ++++ 5 files changed, 13 insertions(+), 5 deletions(-) diff --git a/docs/plugins.md b/docs/plugins.md index 36fd5239f4..e41f68801f 100644 --- a/docs/plugins.md +++ b/docs/plugins.md @@ -44,9 +44,9 @@ invoked. The called hook may modify `env` just like any Rack middleware. `#on_after_rack(env, headers, io)` will be called after the Rack application has completed its execution and before any response content is written to the -client. A plugin may take over from here by returning an instance of a -`Puma::StreamClient` descendant. Check out `lib/puma/stream_client.rb` to know -more about this interface. +client. A plugin may take over from here by returning an object that responds +to `#stream?` with a truthy value. Check out `lib/puma/stream_client.rb` to +know more about this interface. If more than one plugin arises interest in taking over, an exception will be happen and Puma will serve a 500. diff --git a/lib/puma/client.rb b/lib/puma/client.rb index 8564ad8495..a766f53d86 100644 --- a/lib/puma/client.rb +++ b/lib/puma/client.rb @@ -145,6 +145,10 @@ def on_shutdown close end + def stream? + false + end + # The object used for a request with no body. All requests with # no body share this one object since it has no state. EmptyBody = NullIO.new diff --git a/lib/puma/reactor.rb b/lib/puma/reactor.rb index 20f7cdb890..9489a79975 100644 --- a/lib/puma/reactor.rb +++ b/lib/puma/reactor.rb @@ -215,7 +215,7 @@ def run_internal end end - if c.is_a? StreamClient + if c.stream? if c.on_read_ready @app_pool << c end diff --git a/lib/puma/server.rb b/lib/puma/server.rb index b2249f8092..44fa2d3515 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -711,7 +711,7 @@ def handle_request(req, lines) @on_after_rack.reverse_each do |hook| stream_client = hook.call(env, headers, req.io) - if stream_client.is_a? StreamClient + if stream_client.stream? if is_async raise "Only one #on_after_rack hook should take over" else diff --git a/lib/puma/stream_client.rb b/lib/puma/stream_client.rb index 5add5bb773..966b91cebe 100644 --- a/lib/puma/stream_client.rb +++ b/lib/puma/stream_client.rb @@ -29,6 +29,10 @@ def closed? @io.closed? end + def stream? + true + end + # This method will be invoked when the IO descriptor has new data pending # to be read. You can read from `@io` at this time. # From f6de6b6fd219e4c2b3f56bda45552b5b9a3cf93b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Burgos=20Maci=C3=A1?= Date: Mon, 29 Jul 2019 11:10:05 -0400 Subject: [PATCH 7/9] Avoid failing if the callback returns nil --- lib/puma/server.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/puma/server.rb b/lib/puma/server.rb index 44fa2d3515..f7aa0dfb29 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -711,7 +711,7 @@ def handle_request(req, lines) @on_after_rack.reverse_each do |hook| stream_client = hook.call(env, headers, req.io) - if stream_client.stream? + if stream_client && stream_client.stream? if is_async raise "Only one #on_after_rack hook should take over" else From 39af390526e4b9cdafb8ffb2ccd3455fbee46f47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Burgos=20Maci=C3=A1?= Date: Tue, 30 Jul 2019 11:19:33 -0400 Subject: [PATCH 8/9] Avoids calling `#nil?` --- lib/puma/server.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/puma/server.rb b/lib/puma/server.rb index f7aa0dfb29..f4f023f375 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -672,7 +672,7 @@ def handle_request(req, lines) # after_reply = env[RACK_AFTER_REPLY] = [] - unless @on_before_rack.nil? + if @on_before_rack @on_before_rack.each { |hook| hook.call(env) } end @@ -705,7 +705,7 @@ def handle_request(req, lines) status, headers, res_body = lowlevel_error(e, env) end - unless @on_after_rack.nil? + if @on_after_rack is_async = false @on_after_rack.reverse_each do |hook| From aa6ccbe7f9967ff539706ba505e8583cee514849 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Burgos=20Maci=C3=A1?= Date: Tue, 30 Jul 2019 11:20:23 -0400 Subject: [PATCH 9/9] Adds more detail on what's the purpose of `#churn` --- lib/puma/stream_client.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/puma/stream_client.rb b/lib/puma/stream_client.rb index 966b91cebe..8c2ab7f272 100644 --- a/lib/puma/stream_client.rb +++ b/lib/puma/stream_client.rb @@ -62,6 +62,9 @@ def on_shutdown # # You can return a truthy value to add this client to the thread pool # again. + # + # This allows the plugin to process its work on the thread pool as any + # other regular HTTP request. def churn raise NotImplementedError end