/
worker.rb
173 lines (150 loc) · 5.59 KB
/
worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# frozen_string_literal: true
module Puma
class Cluster < Puma::Runner
# This class is instantiated by the `Puma::Cluster` and represents a single
# worker process.
#
# At the core of this class is running an instance of `Puma::Server` which
# gets created via the `start_server` method from the `Puma::Runner` class
# that this inherits from.
class Worker < Puma::Runner
attr_reader :index, :master
def initialize(index:, master:, launcher:, pipes:, server: nil)
super launcher, launcher.events
@index = index
@master = master
@launcher = launcher
@options = launcher.options
@check_pipe = pipes[:check_pipe]
@worker_write = pipes[:worker_write]
@fork_pipe = pipes[:fork_pipe]
@wakeup = pipes[:wakeup]
@server = server
end
def run
title = "puma: cluster worker #{index}: #{master}"
title += " [#{@options[:tag]}]" if @options[:tag] && !@options[:tag].empty?
$0 = title
Signal.trap "SIGINT", "IGNORE"
Signal.trap "SIGCHLD", "DEFAULT"
Thread.new do
Puma.set_thread_name "worker check pipe"
@check_pipe.wait_readable
log "! Detected parent died, dying"
exit! 1
end
# If we're not running under a Bundler context, then
# report the info about the context we will be using
if !ENV['BUNDLE_GEMFILE']
if File.exist?("Gemfile")
log "+ Gemfile in context: #{File.expand_path("Gemfile")}"
elsif File.exist?("gems.rb")
log "+ Gemfile in context: #{File.expand_path("gems.rb")}"
end
end
# Invoke any worker boot hooks so they can get
# things in shape before booting the app.
@launcher.config.run_hooks :before_worker_boot, index, @launcher.events
begin
server = @server ||= start_server
rescue Exception => e
log "! Unable to start worker"
log e.backtrace[0]
exit 1
end
restart_server = Queue.new << true << false
fork_worker = @options[:fork_worker] && index == 0
if fork_worker
restart_server.clear
worker_pids = []
Signal.trap "SIGCHLD" do
wakeup! if worker_pids.reject! do |p|
Process.wait(p, Process::WNOHANG) rescue true
end
end
Thread.new do
Puma.set_thread_name "worker fork pipe"
while (idx = @fork_pipe.gets)
idx = idx.to_i
if idx == -1 # stop server
if restart_server.length > 0
restart_server.clear
server.begin_restart(true)
@launcher.config.run_hooks :before_refork, nil, @launcher.events
Puma::Util.nakayoshi_gc @events if @options[:nakayoshi_fork]
end
elsif idx == 0 # restart server
restart_server << true << false
else # fork worker
worker_pids << pid = spawn_worker(idx)
@worker_write << "f#{pid}:#{idx}\n" rescue nil
end
end
end
end
Signal.trap "SIGTERM" do
@worker_write << "e#{Process.pid}\n" rescue nil
restart_server.clear
server.stop
restart_server << false
end
begin
@worker_write << "b#{Process.pid}:#{index}\n"
rescue SystemCallError, IOError
Puma::Util.purge_interrupt_queue
STDERR.puts "Master seems to have exited, exiting."
return
end
while restart_server.pop
server_thread = server.run
stat_thread ||= Thread.new(@worker_write) do |io|
Puma.set_thread_name "stat payload"
base_payload = "p#{Process.pid}"
while true
begin
b = server.backlog || 0
r = server.running || 0
t = server.pool_capacity || 0
m = server.max_threads || 0
rc = server.requests_count || 0
payload = %Q!#{base_payload}{ "backlog":#{b}, "running":#{r}, "pool_capacity":#{t}, "max_threads": #{m}, "requests_count": #{rc} }\n!
io << payload
rescue IOError
Puma::Util.purge_interrupt_queue
break
end
sleep @options[:worker_check_interval]
end
end
server_thread.join
end
# Invoke any worker shutdown hooks so they can prevent the worker
# exiting until any background operations are completed
@launcher.config.run_hooks :before_worker_shutdown, index, @launcher.events
ensure
@worker_write << "t#{Process.pid}\n" rescue nil
@worker_write.close
end
private
def spawn_worker(idx)
@launcher.config.run_hooks :before_worker_fork, idx, @launcher.events
pid = fork do
new_worker = Worker.new index: idx,
master: master,
launcher: @launcher,
pipes: { check_pipe: @check_pipe,
worker_write: @worker_write },
server: @server
new_worker.run
end
if !pid
log "! Complete inability to spawn new workers detected"
log "! Seppuku is the only choice."
exit! 1
end
@launcher.config.run_hooks :after_worker_fork, idx, @launcher.events
pid
end
end
end
end