Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: Simplify TargetInfo related code #3489

Merged
merged 6 commits into from May 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 30 additions & 39 deletions lib/fluent/plugin/in_tail.rb
Expand Up @@ -357,11 +357,11 @@ def expand_paths

def existence_path
hash = {}
@tails.each_key {|target_info|
@tails.each {|path, tw|
if @follow_inodes
hash[target_info.ino] = target_info
hash[tw.ino] = TargetInfo.new(tw.path, tw.ino)
else
hash[target_info.path] = target_info
hash[tw.path] = TargetInfo.new(tw.path, tw.ino)
end
}
hash
Expand Down Expand Up @@ -425,36 +425,31 @@ def setup_watcher(target_info, pe)
end

def construct_watcher(target_info)
path = target_info.path

# The file might be rotated or removed after collecting paths, so check inode again here.
begin
target_info.ino = Fluent::FileWrapper.stat(path).ino
rescue Errno::ENOENT, Errno::EACCES
$log.warn "stat() for #{path} failed. Continuing without tailing it."
return
end

pe = nil
if @pf
pe = @pf[target_info]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(Fluent::FileWrapper.stat(target_info.path).ino, 0)
rescue Errno::ENOENT, Errno::EACCES
$log.warn "stat() for #{target_info.path} failed. Continuing without tailing it."
end
end
pe.update(target_info.ino, 0) if @read_from_head && pe.read_inode.zero?
end

begin
tw = setup_watcher(target_info, pe)
rescue WatcherSetupError => e
log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}"
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
return
end

begin
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails.delete(target_info)
@tails[target_info] = tw
tw.on_notify
rescue Errno::ENOENT, Errno::EACCES => e
$log.warn "stat() for #{target_info.path} failed with #{e.class.name}. Drop tail watcher for now."
# explicitly detach and unwatch watcher `tw`.
tw.unwatched = true
detach_watcher(tw, target_info.ino, false)
end
@tails[path] = tw
tw.on_notify
end

def start_watchers(targets_info)
Expand All @@ -469,9 +464,9 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch
remove_path_from_group_watcher(target_info.path)

if remove_watcher
tw = @tails.delete(target_info)
tw = @tails.delete(target_info.path)
else
tw = @tails[target_info]
tw = @tails[target_info.path]
end
if tw
tw.unwatched = unwatched
Expand All @@ -485,8 +480,8 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch
end

def close_watcher_handles
@tails.keys.each do |target_info|
tw = @tails.delete(target_info)
@tails.keys.each do |path|
tw = @tails.delete(path)
if tw
tw.close
end
Expand All @@ -495,20 +490,20 @@ def close_watcher_handles

# refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
def update_watcher(target_info, pe)
log.info("detected rotation of #{target_info.path}; waiting #{@rotate_wait} seconds")
path = target_info.path

log.info("detected rotation of #{path}; waiting #{@rotate_wait} seconds")

if @pf
pe_inode = pe.read_inode
target_info_from_position_entry = TargetInfo.new(target_info.path, pe_inode)
target_info_from_position_entry = TargetInfo.new(path, pe_inode)
unless pe_inode == @pf[target_info_from_position_entry].read_inode
log.debug "Skip update_watcher because watcher has been already updated by other inotify event"
return
end
end

rotated_target_info = TargetInfo.new(target_info.path, pe.read_inode)
rotated_tw = @tails[rotated_target_info]
new_target_info = target_info.dup
rotated_tw = @tails[path]

if @follow_inodes
new_position_entry = @pf[target_info]
Expand All @@ -517,16 +512,12 @@ def update_watcher(target_info, pe)
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
rotated_tw.unwatched = true
# Make sure to delete old key, it has a different ino while the hash key is same.
@tails.delete(rotated_target_info)
@tails[new_target_info] = setup_watcher(new_target_info, new_position_entry)
@tails[new_target_info].on_notify
@tails[path] = setup_watcher(target_info, new_position_entry)
@tails[path].on_notify
end
else
# Make sure to delete old key, it has a different ino while the hash key is same.
@tails.delete(rotated_target_info)
@tails[new_target_info] = setup_watcher(new_target_info, pe)
@tails[new_target_info].on_notify
@tails[path] = setup_watcher(target_info, pe)
@tails[path].on_notify
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw
end
Expand Down
16 changes: 1 addition & 15 deletions lib/fluent/plugin/in_tail/position_file.rb
Expand Up @@ -250,20 +250,6 @@ def read_inode
end
end

TargetInfo = Struct.new(:path, :ino) do
def ==(other)
return false unless other.is_a?(TargetInfo)
self.path == other.path
end

def hash
self.path.hash
end

def eql?(other)
return false unless other.is_a?(TargetInfo)
self.path == other.path
end
end
TargetInfo = Struct.new(:path, :ino)
end
end
54 changes: 0 additions & 54 deletions test/plugin/in_tail/test_position_file.rb
Expand Up @@ -313,58 +313,4 @@ def build_files(file)
assert_equal 2, f.read_inode
end
end

sub_test_case "TargetInfo equality rules" do
sub_test_case "== operator" do
def test_equal
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1235)

assert_equal t1, t2
end

def test_not_equal
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test2", 1234)

assert_not_equal t1, t2
end
end

sub_test_case "eql? method" do
def test_eql?
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test", 5321)

assert do
t1.eql? t2
end
end

def test_not_eql?
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test2", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test3", 1234)

assert do
!t1.eql? t2
end
end
end

sub_test_case "hash" do
def test_equal
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test", 7321)

assert_equal t1.hash, t2.hash
end

def test_not_equal
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test2", 1234)

assert_not_equal t1.hash, t2.hash
end
end
end
end
45 changes: 25 additions & 20 deletions test/plugin/test_in_tail.rb
Expand Up @@ -1777,8 +1777,7 @@ def test_z_refresh_watchers
end

path = 'test/plugin/data/2010/01/20100102-030405.log'
target_info = Fluent::Plugin::TailInput::TargetInfo.new(path, Fluent::FileWrapper.stat(path).ino)
mock.proxy(plugin).detach_watcher_after_rotate_wait(plugin.instance_variable_get(:@tails)[target_info], target_info.ino)
mock.proxy(plugin).detach_watcher_after_rotate_wait(plugin.instance_variable_get(:@tails)[path], Fluent::FileWrapper.stat(path).ino)

Timecop.freeze(2010, 1, 2, 3, 4, 6) do
path = "test/plugin/data/2010/01/20100102-030406.log"
Expand Down Expand Up @@ -2179,13 +2178,13 @@ def test_should_close_watcher_after_rotate_wait
target_info = create_target_info("#{TMP_DIR}/tail.txt")
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, true, anything, nil, anything, anything).once
d.run(shutdown: false)
assert d.instance.instance_variable_get(:@tails)[target_info]
assert d.instance.instance_variable_get(:@tails)[target_info.path]

Timecop.travel(now + 10) do
d.instance.instance_eval do
sleep 0.1 until @tails[target_info] == nil
sleep 0.1 until @tails[target_info.path] == nil
end
assert_nil d.instance.instance_variable_get(:@tails)[target_info]
assert_nil d.instance.instance_variable_get(:@tails)[target_info.path]
end
d.instance_shutdown
end
Expand Down Expand Up @@ -2216,8 +2215,8 @@ def test_should_create_new_watcher_for_new_file_with_same_name
Timecop.travel(now + 10) do
sleep 3
d.instance.instance_eval do
@tails[path_ino] == nil
@tails[new_path_ino] != nil
@tails[path_ino.path] == nil
@tails[new_path_ino.path] != nil
end
end

Expand Down Expand Up @@ -2276,8 +2275,8 @@ def test_should_replace_target_info
while d.events.size < 1 do
sleep 0.1
end
inodes = d.instance.instance_variable_get(:@tails).keys.collect do |key|
key.ino
inodes = d.instance.instance_variable_get(:@tails).values.collect do |tw|
tw.ino
end
assert_equal([target_info.ino], inodes)

Expand All @@ -2287,8 +2286,8 @@ def test_should_replace_target_info
while d.events.size < 2 do
sleep 0.1
end
inodes = d.instance.instance_variable_get(:@tails).keys.collect do |key|
key.ino
inodes = d.instance.instance_variable_get(:@tails).values.collect do |tw|
tw.ino
end
new_target_info = create_target_info("#{TMP_DIR}/tail.txt")
assert_not_equal(target_info.ino, new_target_info.ino)
Expand Down Expand Up @@ -2458,14 +2457,19 @@ def test_ENOENT_error_after_setup_watcher
'format' => 'none',
})
d = create_driver(config)
mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
cleanup_file(path)
tw
end
file_deleted = false
mock.proxy(d.instance).existence_path do |hash|
unless file_deleted
cleanup_file(path)
file_deleted = true
end
hash
end.twice
assert_nothing_raised do
d.run(shutdown: false) {}
end
assert($log.out.logs.any?{|log| log.include?("stat() for #{path} failed with Errno::ENOENT. Drop tail watcher for now.\n") })
assert($log.out.logs.any?{|log| log.include?("stat() for #{path} failed. Continuing without tailing it.\n") },
$log.out.logs.join("\n"))
ensure
d.instance_shutdown if d && d.instance
end
Expand All @@ -2483,14 +2487,15 @@ def test_EACCES_error_after_setup_watcher
'format' => 'none',
})
d = create_driver(config, false)
mock.proxy(d.instance).setup_watcher(anything, anything) do |tw|
mock.proxy(d.instance).existence_path do |hash|
FileUtils.chmod(0000, "#{TMP_DIR}/noaccess")
tw
end
hash
end.twice
assert_nothing_raised do
d.run(shutdown: false) {}
end
assert($log.out.logs.any?{|log| log.include?("stat() for #{path} failed with Errno::EACCES. Drop tail watcher for now.\n") })
assert($log.out.logs.any?{|log| log.include?("stat() for #{path} failed. Continuing without tailing it.\n") },
$log.out.logs.join("\n"))
end
ensure
d.instance_shutdown if d && d.instance
Expand Down