Skip to content

Commit

Permalink
Merge pull request #3680 from daipom/add-sigdump-func-to-fluent-ctl
Browse files Browse the repository at this point in the history
Add sigdump func to fluent-ctl
  • Loading branch information
ashie committed May 20, 2022
2 parents 12d445f + 4b18c6f commit ebab9db
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 23 deletions.
3 changes: 3 additions & 0 deletions lib/fluent/command/ctl.rb
Expand Up @@ -36,6 +36,7 @@ class Ctl
restart: "HUP",
flush: "USR1",
reload: "USR2",
dump: "CONT",
}
WINSVC_CONTROL_CODE_MAP = {
shutdown: SERVICE_CONTROL_STOP,
Expand All @@ -44,13 +45,15 @@ class Ctl
restart: 128,
flush: 129,
reload: SERVICE_CONTROL_PARAMCHANGE,
dump: 130,
}
else
COMMAND_MAP = {
shutdown: :TERM,
restart: :HUP,
flush: :USR1,
reload: :USR2,
dump: :CONT,
}
end

Expand Down
111 changes: 91 additions & 20 deletions lib/fluent/supervisor.rb
Expand Up @@ -16,6 +16,7 @@

require 'fileutils'
require 'open3'
require 'pathname'

require 'fluent/config'
require 'fluent/counter'
Expand Down Expand Up @@ -215,44 +216,50 @@ def install_windows_event_handler
Thread.new do
ipc = Win32::Ipc.new(nil)
events = [
Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"),
Win32::Event.new("#{@pid_signame}"),
Win32::Event.new("#{@pid_signame}_HUP"),
Win32::Event.new("#{@pid_signame}_USR1"),
Win32::Event.new("#{@pid_signame}_USR2"),
{win32_event: Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"), action: :stop_event_thread},
{win32_event: Win32::Event.new("#{@pid_signame}"), action: :stop},
{win32_event: Win32::Event.new("#{@pid_signame}_HUP"), action: :hup},
{win32_event: Win32::Event.new("#{@pid_signame}_USR1"), action: :usr1},
{win32_event: Win32::Event.new("#{@pid_signame}_USR2"), action: :usr2},
{win32_event: Win32::Event.new("#{@pid_signame}_CONT"), action: :cont},
]
if @signame
signame_events = [
Win32::Event.new("#{@signame}"),
Win32::Event.new("#{@signame}_HUP"),
Win32::Event.new("#{@signame}_USR1"),
Win32::Event.new("#{@signame}_USR2"),
{win32_event: Win32::Event.new("#{@signame}"), action: :stop},
{win32_event: Win32::Event.new("#{@signame}_HUP"), action: :hup},
{win32_event: Win32::Event.new("#{@signame}_USR1"), action: :usr1},
{win32_event: Win32::Event.new("#{@signame}_USR2"), action: :usr2},
{win32_event: Win32::Event.new("#{@signame}_CONT"), action: :cont},
]
events.concat(signame_events)
end
begin
loop do
idx = ipc.wait_any(events, Windows::Synchronize::INFINITE)
if idx > 0 && idx <= events.length
$log.debug("Got Win32 event \"#{events[idx - 1].name}\"")
ipc_idx = ipc.wait_any(events.map {|e| e[:win32_event]}, Windows::Synchronize::INFINITE)
event_idx = ipc_idx - 1

if event_idx >= 0 && event_idx < events.length
$log.debug("Got Win32 event \"#{events[event_idx][:win32_event].name}\"")
else
$log.warn("Unexpected reutrn value of Win32::Ipc#wait_any: #{idx}")
$log.warn("Unexpected return value of Win32::Ipc#wait_any: #{ipc_idx}")
end
case idx
when 2, 6
case events[event_idx][:action]
when :stop
stop(true)
when 3, 7
when :hup
supervisor_sighup_handler
when 4, 8
when :usr1
supervisor_sigusr1_handler
when 5, 9
when :usr2
supervisor_sigusr2_handler
when 1
when :cont
supervisor_dump_handler_for_windows
when :stop_event_thread
break
end
end
ensure
events.each { |event| event.close }
events.each { |event| event[:win32_event].close }
end
end
end
Expand Down Expand Up @@ -302,6 +309,26 @@ def supervisor_sigusr2_handler
$log.error "Failed to reload config file: #{e}"
end

def supervisor_dump_handler_for_windows
# As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file,
# and it is implemented before the implementation of the function for Windows.
# It is possible to trap SIGCONT and handle it here also on UNIX-like,
# but for backward compatibility, this handler is currently for a Windows-only.
raise "[BUG] This function is for Windows ONLY." unless Fluent.windows?

Thread.new do
begin
FluentSigdump.dump_windows
rescue => e
$log.error "failed to dump: #{e}"
end
end

send_signal_to_workers(:CONT)
rescue => e
$log.error "failed to dump: #{e}"
end

def kill_worker
if config[:worker_pid]
pids = config[:worker_pid].clone
Expand Down Expand Up @@ -358,6 +385,14 @@ def send_command_to_workers(signal)
restart(true)
when :USR2
reload
when :CONT
dump_all_windows_workers
end
end

def dump_all_windows_workers
@monitors.each do |m|
m.send_command("DUMP\n")
end
end
end
Expand Down Expand Up @@ -906,6 +941,9 @@ def install_main_process_command_handlers
when "RELOAD"
$log.debug "fluentd main process get #{cmd} command"
reload_config
when "DUMP"
$log.debug "fluentd main process get #{cmd} command"
dump
else
$log.warn "fluentd main process get unknown command [#{cmd}]"
end
Expand Down Expand Up @@ -956,6 +994,16 @@ def reload_config
end
end

def dump
Thread.new do
begin
FluentSigdump.dump_windows
rescue => e
$log.error("failed to dump: #{e}")
end
end
end

def logging_with_console_output
yield $log
unless @log.stdout?
Expand Down Expand Up @@ -1065,4 +1113,27 @@ def build_spawn_command
fluentd_spawn_cmd
end
end

module FluentSigdump
def self.dump_windows
raise "[BUG] WindowsSigdump::dump is for Windows ONLY." unless Fluent.windows?

# Sigdump outputs under `/tmp` dir without `SIGDUMP_PATH` specified,
# but `/tmp` dir may not exist on Windows by default.
# So use the systemroot-temp-dir instead.
dump_filepath = ENV['SIGDUMP_PATH'].nil? || ENV['SIGDUMP_PATH'].empty? \
? "#{ENV['windir']}/Temp/fluentd-sigdump-#{Process.pid}.log"
: get_path_with_pid(ENV['SIGDUMP_PATH'])

require 'sigdump'
Sigdump.dump(dump_filepath)

$log.info "dump to #{dump_filepath}."
end

def self.get_path_with_pid(raw_path)
path = Pathname.new(raw_path)
path.sub_ext("-#{Process.pid}#{path.extname}").to_s
end
end
end
2 changes: 2 additions & 0 deletions lib/fluent/winsvc.rb
Expand Up @@ -84,6 +84,8 @@ def service_user_defined_control(code)
set_event("#{@service_name}_HUP")
when 129
set_event("#{@service_name}_USR1")
when 130
set_event("#{@service_name}_CONT")
end
end

Expand Down
12 changes: 9 additions & 3 deletions test/plugin/test_in_object_space.rb
Expand Up @@ -17,13 +17,19 @@ def waiting(seconds, instance)
end

class FailObject
def self.class
raise "error"
end
end

def setup
Fluent::Test.setup
# Overriding this behavior in the global scope will have an unexpected influence on other tests.
# So this should be overridden here and be removed in `teardown`.
def FailObject.class
raise "FailObject error for tests in ObjectSpaceInputTest."
end
end

def teardown
FailObject.singleton_class.remove_method(:class)
end

TESTCONFIG = %[
Expand Down
43 changes: 43 additions & 0 deletions test/test_supervisor.rb
Expand Up @@ -300,6 +300,49 @@ def server.config
$log.out.reset if $log && $log.out && $log.out.respond_to?(:reset)
end

data("Normal", {raw_path: "C:\\Windows\\Temp\\sigdump.log", expected: "C:\\Windows\\Temp\\sigdump-#{$$}.log"})
data("UNIX style", {raw_path: "/Windows/Temp/sigdump.log", expected: "/Windows/Temp/sigdump-#{$$}.log"})
data("No extension", {raw_path: "C:\\Windows\\Temp\\sigdump", expected: "C:\\Windows\\Temp\\sigdump-#{$$}"})
data("Multi-extension", {raw_path: "C:\\Windows\\Temp\\sig.dump.bk", expected: "C:\\Windows\\Temp\\sig.dump-#{$$}.bk"})
def test_fluentsigdump_get_path_with_pid(data)
p data
path = Fluent::FluentSigdump.get_path_with_pid(data[:raw_path])
assert_equal(data[:expected], path)
end

def test_supervisor_event_dump_windows
omit "Only for Windows, alternative to UNIX signals" unless Fluent.windows?

server = DummyServer.new
def server.config
{:signame => "TestFluentdEvent"}
end
server.install_windows_event_handler

assert_rr do
# Have to use mock because `Sigdump.dump` seems to be somehow incompatible with RR.
# The `mock(server).restart(true) { nil }` line in `test_rpc_server_windows` cause the next error.
# Failure: test_supervisor_event_dump_windows(SupervisorTest):
# class()
# Called 0 times.
# Expected 1 times.
# .../Ruby26-x64/lib/ruby/gems/2.6.0/gems/sigdump-0.2.4/lib/sigdump.rb:74:in `block in dump_object_count'
# 73: ObjectSpace.each_object {|o|
# 74: c = o.class <-- HERE!
mock(Sigdump).dump(anything)

begin
sleep 0.1 # Wait for starting windows event thread
event = Win32::Event.open("TestFluentdEvent_CONT")
event.set
event.close
sleep 1.0 # Wait for dumping
ensure
server.stop_windows_event_thread
end
end
end

data(:ipv4 => ["0.0.0.0", "127.0.0.1", false],
:ipv6 => ["[::]", "[::1]", true],
:localhost_ipv4 => ["localhost", "127.0.0.1", false])
Expand Down

0 comments on commit ebab9db

Please sign in to comment.