Skip to content

Commit

Permalink
in_tail: Don't use TargetInfo as hash table key
Browse files Browse the repository at this point in the history
Signed-off-by: Takuro Ashie <ashie@clear-code.com>
  • Loading branch information
ashie committed Aug 18, 2021
1 parent 2063d3b commit 5804d41
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 97 deletions.
29 changes: 12 additions & 17 deletions lib/fluent/plugin/in_tail.rb
Expand Up @@ -345,11 +345,11 @@ def expand_paths

def existence_path
hash = {}
@tails.each_key {|target_info|
@tails.each {|path, tw|
if @follow_inodes
hash[target_info.ino] = target_info
hash[tw.ino] = TargetInfo.new(tw.path, tw.ino)
else
hash[target_info.path] = target_info
hash[tw.path] = TargetInfo.new(tw.path, tw.ino)
end
}
hash
Expand Down Expand Up @@ -426,8 +426,7 @@ def construct_watcher(target_info)

begin
target_info = TargetInfo.new(target_info.path, Fluent::FileWrapper.stat(target_info.path).ino)
@tails.delete(target_info)
@tails[target_info] = tw
@tails[target_info.path] = tw
tw.on_notify
rescue Errno::ENOENT, Errno::EACCES => e
$log.warn "stat() for #{target_info.path} failed with #{e.class.name}. Drop tail watcher for now."
Expand All @@ -447,9 +446,9 @@ def start_watchers(targets_info)
def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watcher: true)
targets_info.each_value { |target_info|
if remove_watcher
tw = @tails.delete(target_info)
tw = @tails.delete(target_info.path)
else
tw = @tails[target_info]
tw = @tails[target_info.path]
end
if tw
tw.unwatched = unwatched
Expand All @@ -463,8 +462,8 @@ def stop_watchers(targets_info, immediate: false, unwatched: false, remove_watch
end

def close_watcher_handles
@tails.keys.each do |target_info|
tw = @tails.delete(target_info)
@tails.keys.each do |path|
tw = @tails.delete(path)
if tw
tw.close
end
Expand All @@ -485,7 +484,7 @@ def update_watcher(target_info, pe)
end

rotated_target_info = TargetInfo.new(target_info.path, pe.read_inode)
rotated_tw = @tails[rotated_target_info]
rotated_tw = @tails[rotated_target_info.path]
new_target_info = target_info.dup

if @follow_inodes
Expand All @@ -495,15 +494,11 @@ def update_watcher(target_info, pe)
# When follow_inodes is true, it's not cleaned up by refresh_watcher.
# So it should be unwatched here explicitly.
rotated_tw.unwatched = true
# Make sure to delete old key, it has a different ino while the hash key is same.
@tails.delete(rotated_target_info)
@tails[new_target_info] = setup_watcher(new_target_info, new_position_entry)
@tails[new_target_info].on_notify
@tails[new_target_info.path] = setup_watcher(new_target_info, new_position_entry)
@tails[new_target_info.path].on_notify
end
else
# Make sure to delete old key, it has a different ino while the hash key is same.
@tails.delete(rotated_target_info)
@tails[new_target_info] = setup_watcher(new_target_info, pe)
@tails[new_target_info.path] = setup_watcher(new_target_info, pe)
@tails[new_target_info].on_notify
end
detach_watcher_after_rotate_wait(rotated_tw, pe.read_inode) if rotated_tw
Expand Down
16 changes: 1 addition & 15 deletions lib/fluent/plugin/in_tail/position_file.rb
Expand Up @@ -250,20 +250,6 @@ def read_inode
end
end

TargetInfo = Struct.new(:path, :ino) do
def ==(other)
return false unless other.is_a?(TargetInfo)
self.path == other.path
end

def hash
self.path.hash
end

def eql?(other)
return false unless other.is_a?(TargetInfo)
self.path == other.path
end
end
TargetInfo = Struct.new(:path, :ino)
end
end
54 changes: 0 additions & 54 deletions test/plugin/in_tail/test_position_file.rb
Expand Up @@ -322,58 +322,4 @@ def build_files(file)
assert_equal 2, f.read_inode
end
end

sub_test_case "TargetInfo equality rules" do
sub_test_case "== operator" do
def test_equal
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1235)

assert_equal t1, t2
end

def test_not_equal
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test2", 1234)

assert_not_equal t1, t2
end
end

sub_test_case "eql? method" do
def test_eql?
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test", 5321)

assert do
t1.eql? t2
end
end

def test_not_eql?
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test2", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test3", 1234)

assert do
!t1.eql? t2
end
end
end

sub_test_case "hash" do
def test_equal
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test", 7321)

assert_equal t1.hash, t2.hash
end

def test_not_equal
t1 = Fluent::Plugin::TailInput::TargetInfo.new("test", 1234)
t2 = Fluent::Plugin::TailInput::TargetInfo.new("test2", 1234)

assert_not_equal t1.hash, t2.hash
end
end
end
end
21 changes: 10 additions & 11 deletions test/plugin/test_in_tail.rb
Expand Up @@ -1535,8 +1535,7 @@ def test_z_refresh_watchers
end

path = 'test/plugin/data/2010/01/20100102-030405.log'
target_info = Fluent::Plugin::TailInput::TargetInfo.new(path, Fluent::FileWrapper.stat(path).ino)
mock.proxy(plugin).detach_watcher_after_rotate_wait(plugin.instance_variable_get(:@tails)[target_info], target_info.ino)
mock.proxy(plugin).detach_watcher_after_rotate_wait(plugin.instance_variable_get(:@tails)[path], Fluent::FileWrapper.stat(path).ino)

Timecop.freeze(2010, 1, 2, 3, 4, 6) do
path = "test/plugin/data/2010/01/20100102-030406.log"
Expand Down Expand Up @@ -1893,13 +1892,13 @@ def test_should_close_watcher_after_rotate_wait
target_info = create_target_info("#{TMP_DIR}/tail.txt")
mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, true, anything, nil, anything).once
d.run(shutdown: false)
assert d.instance.instance_variable_get(:@tails)[target_info]
assert d.instance.instance_variable_get(:@tails)[target_info.path]

Timecop.travel(now + 10) do
d.instance.instance_eval do
sleep 0.1 until @tails[target_info] == nil
sleep 0.1 until @tails[target_info.path] == nil
end
assert_nil d.instance.instance_variable_get(:@tails)[target_info]
assert_nil d.instance.instance_variable_get(:@tails)[target_info.path]
end
d.instance_shutdown
end
Expand Down Expand Up @@ -1930,8 +1929,8 @@ def test_should_create_new_watcher_for_new_file_with_same_name
Timecop.travel(now + 10) do
sleep 3
d.instance.instance_eval do
@tails[path_ino] == nil
@tails[new_path_ino] != nil
@tails[path_ino.path] == nil
@tails[new_path_ino.path] != nil
end
end

Expand Down Expand Up @@ -1990,8 +1989,8 @@ def test_should_replace_target_info
while d.events.size < 1 do
sleep 0.1
end
inodes = d.instance.instance_variable_get(:@tails).keys.collect do |key|
key.ino
inodes = d.instance.instance_variable_get(:@tails).values.collect do |tw|
tw.ino
end
assert_equal([target_info.ino], inodes)

Expand All @@ -2001,8 +2000,8 @@ def test_should_replace_target_info
while d.events.size < 2 do
sleep 0.1
end
inodes = d.instance.instance_variable_get(:@tails).keys.collect do |key|
key.ino
inodes = d.instance.instance_variable_get(:@tails).values.collect do |tw|
tw.ino
end
new_target_info = create_target_info("#{TMP_DIR}/tail.txt")
assert_not_equal(target_info.ino, new_target_info.ino)
Expand Down

0 comments on commit 5804d41

Please sign in to comment.