diff --git a/.github/workflows/linux-test.yaml b/.github/workflows/linux-test.yaml index f814623805..5cf2a4f4c1 100644 --- a/.github/workflows/linux-test.yaml +++ b/.github/workflows/linux-test.yaml @@ -13,7 +13,7 @@ jobs: strategy: fail-fast: false matrix: - ruby-version: ['3.1', '3.0', '2.7', '2.6'] + ruby-version: ['3.1', '3.0', '2.7'] os: [ubuntu-latest] experimental: [false] include: diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index ef9d641afa..17b9d022a9 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -12,14 +12,19 @@ class TailInputTest < Test::Unit::TestCase include FlexMock::TestCase + def tmp_dir + File.join(File.dirname(__FILE__), "..", "tmp", "tail#{ENV['TEST_ENV_NUMBER']}", SecureRandom.hex(10)) + end + def setup Fluent::Test.setup - cleanup_directory(TMP_DIR) + @tmp_dir = tmp_dir + cleanup_directory(@tmp_dir) end def teardown super - cleanup_directory(TMP_DIR) + cleanup_directory(@tmp_dir) Fluent::Engine.stop Timecop.return end @@ -30,89 +35,48 @@ def cleanup_directory(path) return end - if Fluent.windows? - Dir.glob("*", base: path).each do |name| - begin - cleanup_file(File.join(path, name)) - rescue - # expect test driver block release already owned file handle. - end - end - else - begin - FileUtils.rm_f(path, secure:true) - rescue ArgumentError - FileUtils.rm_f(path) # For Ruby 2.6 or before. - end - if File.exist?(path) - FileUtils.remove_entry_secure(path, true) - end - end - FileUtils.mkdir_p(path) + FileUtils.remove_entry_secure(path, true) end def cleanup_file(path) - if Fluent.windows? - # On Windows, when the file or directory is removed and created - # frequently, there is a case that creating file or directory will - # fail. This situation is caused by pending file or directory - # deletion which is mentioned on win32 API document [1] - # As a workaround, execute rename and remove method. - # - # [1] https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#files - # - file = File.join(Dir.tmpdir, SecureRandom.hex(10)) - begin - FileUtils.mv(path, file) - FileUtils.rm_rf(file, secure: true) - rescue ArgumentError - FileUtils.rm_rf(file) # For Ruby 2.6 or before. - end - if File.exist?(file) - # ensure files are closed for Windows, on which deleted files - # are still visible from filesystem - GC.start(full_mark: true, immediate_mark: true, immediate_sweep: true) - FileUtils.remove_entry_secure(file, true) - end - else - begin - FileUtils.rm_f(path, secure: true) - rescue ArgumentError - FileUtils.rm_f(path) # For Ruby 2.6 or before. - end - if File.exist?(path) - FileUtils.remove_entry_secure(path, true) - end - end + FileUtils.remove_entry_secure(path, true) end def create_target_info(path) Fluent::Plugin::TailInput::TargetInfo.new(path, Fluent::FileWrapper.stat(path).ino) end - TMP_DIR = File.dirname(__FILE__) + "/../tmp/tail#{ENV['TEST_ENV_NUMBER']}" - ROOT_CONFIG = config_element("ROOT", "", { "tag" => "t1", "rotate_wait" => "2s", "refresh_interval" => "1s" - }) - CONFIG = ROOT_CONFIG + config_element("", "", { "path" => "#{TMP_DIR}/tail.txt" }) - COMMON_CONFIG = CONFIG + config_element("", "", { "pos_file" => "#{TMP_DIR}/tail.pos" }) + }) + + def base_config + ROOT_CONFIG + config_element("", "", { "path" => "#{@tmp_dir}/tail.txt" }) + end + + def common_config + base_config + config_element("", "", { "pos_file" => "#{@tmp_dir}/tail.pos" }) + end + + def common_follow_inode_config + config_element("ROOT", "", { + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", + "tag" => "t1", + "refresh_interval" => "1s", + "read_from_head" => "true", + "format" => "none", + "rotate_wait" => "1s", + "follow_inodes" => "true" + }) + end + CONFIG_READ_FROM_HEAD = config_element("", "", { "read_from_head" => true }) CONFIG_DISABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false }) CONFIG_DISABLE_STAT_WATCHER = config_element("", "", { "enable_stat_watcher" => false }) CONFIG_OPEN_ON_EVERY_UPDATE = config_element("", "", { "open_on_every_update" => true }) - COMMON_FOLLOW_INODE_CONFIG = config_element("ROOT", "", { - "path" => "#{TMP_DIR}/tail.txt*", - "pos_file" => "#{TMP_DIR}/tail.pos", - "tag" => "t1", - "refresh_interval" => "1s", - "read_from_head" => "true", - "format" => "none", - "rotate_wait" => "1s", - "follow_inodes" => "true" - }) SINGLE_LINE_CONFIG = config_element("", "", { "format" => "/(?.*)/" }) PARSE_SINGLE_LINE_CONFIG = config_element("", "", {}, [config_element("parse", "", { "@type" => "/(?.*)/" })]) MULTILINE_CONFIG = config_element( @@ -145,7 +109,26 @@ def create_target_info(path) }) ]) - TAILING_GROUP_PATTERN = "/#{TMP_DIR}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" + EX_ROTATE_WAIT = 0 + EX_FOLLOW_INODES = false + + def ex_config + config_element("", "", { + "tag" => "tail", + "path" => "test/plugin/*/%Y/%m/%Y%m%d-%H%M%S.log,test/plugin/data/log/**/*.log", + "format" => "none", + "pos_file" => "#{@tmp_dir}/tail.pos", + "read_from_head" => true, + "refresh_interval" => 30, + "rotate_wait" => "#{EX_ROTATE_WAIT}s", + "follow_inodes" => "#{EX_FOLLOW_INODES}", + }) + end + + def tailing_group_pattern + "/#{@tmp_dir}\/(?[a-z0-9]([-a-z0-9]*[a-z0-9])?(\/[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?[^_]+)_(?.+)-(?[a-z0-9]{6})\.log$/" + end + DEBUG_LOG_LEVEL = config_element("", "", { "@log_level" => "debug" }) @@ -169,21 +152,21 @@ def create_rule_directive(match_named_captures, limit) end def create_path_element(path) - config_element("source", "", { "path" => "#{TMP_DIR}/#{path}" }) + config_element("source", "", { "path" => "#{@tmp_dir}/#{path}" }) end def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) - config = use_common_conf ? COMMON_CONFIG + conf : conf + config = use_common_conf ? common_config + conf : conf Fluent::Test::Driver::Input.new(Fluent::Plugin::TailInput).configure(config) end sub_test_case "configure" do test "plain single line" do d = create_driver - assert_equal(["#{TMP_DIR}/tail.txt"], d.instance.paths) + assert_equal(["#{@tmp_dir}/tail.txt"], d.instance.paths) assert_equal("t1", d.instance.tag) assert_equal(2, d.instance.rotate_wait) - assert_equal("#{TMP_DIR}/tail.pos", d.instance.pos_file) + assert_equal("#{@tmp_dir}/tail.pos", d.instance.pos_file) assert_equal(1000, d.instance.read_lines_limit) assert_equal(-1, d.instance.read_bytes_limit_per_second) assert_equal(false, d.instance.ignore_repeated_permission_error) @@ -221,7 +204,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) test "follow_inodes w/o pos file" do assert_raise(Fluent::ConfigError) do - create_driver(CONFIG + config_element('', '', {'follow_inodes' => 'true'})) + create_driver(base_config + config_element('', '', {'follow_inodes' => 'true'})) end end @@ -315,7 +298,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) "namespace"=> "/namespace-h/", }, 0) - conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG + conf = create_group_directive(tailing_group_pattern, '1m', rule1, rule2, rule3, rule4) + SINGLE_LINE_CONFIG assert_nothing_raised do create_driver(conf) end @@ -330,7 +313,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) "namespace"=> "/namespace-[d|e]/", "podname"=> "/podname-f/", }, 50) - conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1, rule2) + SINGLE_LINE_CONFIG + conf = create_group_directive(tailing_group_pattern, '1m', rule1, rule2) + SINGLE_LINE_CONFIG assert_raise(RuntimeError) do create_driver(conf) end @@ -350,7 +333,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) "namespace"=> "/namespace-a/", }, 100) - conf = create_group_directive(TAILING_GROUP_PATTERN, '1m', rule3, rule1, rule2) + SINGLE_LINE_CONFIG + conf = create_group_directive(tailing_group_pattern, '1m', rule3, rule1, rule2) + SINGLE_LINE_CONFIG assert_nothing_raised do d = create_driver(conf) instance = d.instance @@ -392,12 +375,12 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) sub_test_case "files should be placed in groups" do test "invalid regex pattern places files in default group" do rule1 = create_rule_directive({}, 100) ## limits default groups - conf = ROOT_CONFIG + DEBUG_LOG_LEVEL + create_group_directive(TAILING_GROUP_PATTERN, '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG + conf = ROOT_CONFIG + DEBUG_LOG_LEVEL + create_group_directive(tailing_group_pattern, '1m', rule1) + create_path_element("test*.txt") + SINGLE_LINE_CONFIG d = create_driver(conf, false) - File.open("#{TMP_DIR}/test1.txt", 'w') - File.open("#{TMP_DIR}/test2.txt", 'w') - File.open("#{TMP_DIR}/test3.txt", 'w') + File.open("#{@tmp_dir}/test1.txt", 'w') + File.open("#{@tmp_dir}/test2.txt", 'w') + File.open("#{@tmp_dir}/test3.txt", 'w') d.run do ## checking default group_watcher's paths @@ -406,9 +389,9 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) assert_equal(3, instance.log.logs.count{|a| a.match?("Cannot find group from metadata, Adding file in the default group\n")}) assert_equal(3, instance.group_watchers[key].size) - assert_true(instance.group_watchers[key].include? File.join(TMP_DIR, 'test1.txt')) - assert_true(instance.group_watchers[key].include? File.join(TMP_DIR, 'test2.txt')) - assert_true(instance.group_watchers[key].include? File.join(TMP_DIR, 'test3.txt')) + assert_true(instance.group_watchers[key].include? File.join(@tmp_dir, 'test1.txt')) + assert_true(instance.group_watchers[key].include? File.join(@tmp_dir, 'test2.txt')) + assert_true(instance.group_watchers[key].include? File.join(@tmp_dir, 'test3.txt')) end end @@ -427,13 +410,13 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) path_element = create_path_element("test-podname*.log") - conf = ROOT_CONFIG + create_group_directive(TAILING_GROUP_PATTERN, '1m', rule4, rule3, rule2, rule1) + path_element + SINGLE_LINE_CONFIG + conf = ROOT_CONFIG + create_group_directive(tailing_group_pattern, '1m', rule4, rule3, rule2, rule1) + path_element + SINGLE_LINE_CONFIG d = create_driver(conf, false) - file1 = File.join(TMP_DIR, "test-podname1_test-namespace1_test-container-15fabq.log") - file2 = File.join(TMP_DIR, "test-podname3_test-namespace1_test-container-15fabq.log") - file3 = File.join(TMP_DIR, "test-podname2_test-namespace2_test-container-15fabq.log") - file4 = File.join(TMP_DIR, "test-podname4_test-namespace3_test-container-15fabq.log") + file1 = File.join(@tmp_dir, "test-podname1_test-namespace1_test-container-15fabq.log") + file2 = File.join(@tmp_dir, "test-podname3_test-namespace1_test-container-15fabq.log") + file3 = File.join(@tmp_dir, "test-podname2_test-namespace2_test-container-15fabq.log") + file4 = File.join(@tmp_dir, "test-podname4_test-namespace3_test-container-15fabq.log") d.run do File.open(file1, 'w') @@ -455,7 +438,7 @@ def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) parse: PARSE_SINGLE_LINE_CONFIG) def test_emit(data) config = data - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -463,7 +446,7 @@ def test_emit(data) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\ntest4" } end @@ -479,11 +462,11 @@ def test_emit(data) def test_emit_with_emit_unmatched_lines_true config = config_element("", "", { "format" => "/^(?test.*)/", "emit_unmatched_lines" => true }) - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test line 1" f.puts "test line 2" f.puts "bad line 1" @@ -515,7 +498,7 @@ def test_emit_with_read_lines_limit(data) msg = 'test' * 2000 # in_tail reads 8192 bytes at once. d.run(expect_emits: num_events, timeout: 2) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts msg f.puts msg } @@ -530,7 +513,7 @@ def test_emit_with_read_lines_limit(data) sub_test_case "log throttling per file" do teardown do - cleanup_file("#{TMP_DIR}/tail.txt") + cleanup_file("#{@tmp_dir}/tail.txt") end sub_test_case "reads_bytes_per_second w/o throttled" do @@ -561,7 +544,7 @@ def test_emit_with_read_bytes_limit_per_second(data) d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| 100.times do f.puts msg end @@ -584,7 +567,7 @@ def test_read_bytes_limit_precede_read_lines_limit start_time = Fluent::Clock.now d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| 8000.times do f.puts msg end @@ -623,7 +606,7 @@ def test_emit_with_read_bytes_limit_per_second(data) io_handler end - File.open("#{TMP_DIR}/tail.txt", "ab") do |f| + File.open("#{@tmp_dir}/tail.txt", "ab") do |f| 100.times do f.puts msg end @@ -633,7 +616,7 @@ def test_emit_with_read_bytes_limit_per_second(data) d.run do start_time = Fluent::Clock.now while Fluent::Clock.now - start_time < 0.8 do - File.open("#{TMP_DIR}/tail.txt", "ab") do |f| + File.open("#{@tmp_dir}/tail.txt", "ab") do |f| f.puts msg f.flush end @@ -651,7 +634,7 @@ def test_longer_than_rotate_wait num_lines = 1024 * 3 msg = "08bytes" - File.open("#{TMP_DIR}/tail.txt", "wb") do |f| + File.open("#{@tmp_dir}/tail.txt", "wb") do |f| f.write("#{msg}\n" * num_lines) end @@ -668,8 +651,8 @@ def test_longer_than_rotate_wait d.run(timeout: 10) do while d.events.size < num_lines do if d.events.size > 0 && !rotated - cleanup_file("#{TMP_DIR}/tail.txt") - FileUtils.touch("#{TMP_DIR}/tail.txt") + cleanup_file("#{@tmp_dir}/tail.txt") + FileUtils.touch("#{@tmp_dir}/tail.txt") rotated = true end sleep 0.3 @@ -687,7 +670,7 @@ def test_shorter_than_rotate_wait num_lines = 1024 * 2 msg = "08bytes" - File.open("#{TMP_DIR}/tail.txt", "wb") do |f| + File.open("#{@tmp_dir}/tail.txt", "wb") do |f| f.write("#{msg}\n" * num_lines) end @@ -714,8 +697,8 @@ def test_shorter_than_rotate_wait d.run(timeout: 10) do until detached do if d.events.size > 0 && !rotated - cleanup_file("#{TMP_DIR}/tail.txt") - FileUtils.touch("#{TMP_DIR}/tail.txt") + cleanup_file("#{@tmp_dir}/tail.txt") + FileUtils.touch("#{@tmp_dir}/tail.txt") rotated = true end sleep 0.3 @@ -735,7 +718,7 @@ def test_shorter_than_rotate_wait parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG) def test_emit_with_read_from_head(data) config = data - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -743,7 +726,7 @@ def test_emit_with_read_from_head(data) d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -761,7 +744,7 @@ def test_emit_with_read_from_head(data) parse: CONFIG_DISABLE_WATCH_TIMER + PARSE_SINGLE_LINE_CONFIG) def test_emit_without_watch_timer(data) config = data - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -769,7 +752,7 @@ def test_emit_without_watch_timer(data) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -788,12 +771,12 @@ def test_watch_wildcard_path_without_watch_timer omit "need inotify" unless Fluent.linux? config = config_element("ROOT", "", { - "path" => "#{TMP_DIR}/tail*.txt", + "path" => "#{@tmp_dir}/tail*.txt", "tag" => "t1", }) config = config + CONFIG_DISABLE_WATCH_TIMER + SINGLE_LINE_CONFIG - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -801,7 +784,7 @@ def test_watch_wildcard_path_without_watch_timer d = create_driver(config, false) d.run(expect_emits: 1, timeout: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -819,7 +802,7 @@ def test_watch_wildcard_path_without_watch_timer parse: CONFIG_DISABLE_STAT_WATCHER + PARSE_SINGLE_LINE_CONFIG) def test_emit_with_disable_stat_watcher(data) config = data - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -827,7 +810,7 @@ def test_emit_with_disable_stat_watcher(data) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -843,7 +826,7 @@ def test_always_read_from_head_on_detecting_a_new_file d = create_driver(SINGLE_LINE_CONFIG) d.run(expect_emits: 1, timeout: 3) do - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1\ntest2\n" } end @@ -869,11 +852,14 @@ class TestWithSystem < self def setup omit "NTFS doesn't support UNIX like permissions" if Fluent.windows? + super # Store default permission @default_permission = system_config.instance_variable_get(:@file_permission) end def teardown + return if Fluent.windows? + super # Restore default permission system_config.instance_variable_set(:@file_permission, @default_permission) end @@ -887,7 +873,7 @@ def test_emit_with_system system_conf = parse_system(CONFIG_SYSTEM) sc = Fluent::SystemConfig.new(system_conf) Fluent::Engine.init(sc) - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -895,7 +881,7 @@ def test_emit_with_system d = create_driver d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -915,16 +901,18 @@ def test_emit_with_system end sub_test_case "rotate file" do + def create_driver(conf = SINGLE_LINE_CONFIG) + config = common_config + conf + Fluent::Test::Driver::Input.new(Fluent::Plugin::TailInput).configure(config) + end + data(flat: SINGLE_LINE_CONFIG, parse: PARSE_SINGLE_LINE_CONFIG) def test_rotate_file(data) config = data events = sub_test_rotate_file(config, expect_emits: 2) - assert_equal(4, events.length) - assert_equal({"message" => "test3"}, events[0][2]) - assert_equal({"message" => "test4"}, events[1][2]) - assert_equal({"message" => "test5"}, events[2][2]) - assert_equal({"message" => "test6"}, events[3][2]) + assert_equal(3.upto(6).collect { |i| {"message" => "test#{i}"} }, + events.collect { |event| event[2] }) end data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG, @@ -932,13 +920,8 @@ def test_rotate_file(data) def test_rotate_file_with_read_from_head(data) config = data events = sub_test_rotate_file(config, expect_records: 6) - assert_equal(6, events.length) - assert_equal({"message" => "test1"}, events[0][2]) - assert_equal({"message" => "test2"}, events[1][2]) - assert_equal({"message" => "test3"}, events[2][2]) - assert_equal({"message" => "test4"}, events[3][2]) - assert_equal({"message" => "test5"}, events[4][2]) - assert_equal({"message" => "test6"}, events[5][2]) + assert_equal(1.upto(6).collect { |i| {"message" => "test#{i}"} }, + events.collect { |event| event[2] }) end data(flat: CONFIG_OPEN_ON_EVERY_UPDATE + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG, @@ -946,13 +929,8 @@ def test_rotate_file_with_read_from_head(data) def test_rotate_file_with_open_on_every_update(data) config = data events = sub_test_rotate_file(config, expect_records: 6) - assert_equal(6, events.length) - assert_equal({"message" => "test1"}, events[0][2]) - assert_equal({"message" => "test2"}, events[1][2]) - assert_equal({"message" => "test3"}, events[2][2]) - assert_equal({"message" => "test4"}, events[3][2]) - assert_equal({"message" => "test5"}, events[4][2]) - assert_equal({"message" => "test6"}, events[5][2]) + assert_equal(1.upto(6).collect { |i| {"message" => "test#{i}"} }, + events.collect { |event| event[2] }) end data(flat: SINGLE_LINE_CONFIG, @@ -960,26 +938,21 @@ def test_rotate_file_with_open_on_every_update(data) def test_rotate_file_with_write_old(data) config = data events = sub_test_rotate_file(config, expect_emits: 3) { |rotated_file| - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } rotated_file.puts "test7" rotated_file.puts "test8" rotated_file.flush sleep 1 - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test5" f.puts "test6" } } # This test sometimes fails and it shows a potential bug of in_tail # https://github.com/fluent/fluentd/issues/1434 - assert_equal(6, events.length) - assert_equal({"message" => "test3"}, events[0][2]) - assert_equal({"message" => "test4"}, events[1][2]) - assert_equal({"message" => "test7"}, events[2][2]) - assert_equal({"message" => "test8"}, events[3][2]) - assert_equal({"message" => "test5"}, events[4][2]) - assert_equal({"message" => "test6"}, events[5][2]) + assert_equal([3, 4, 7, 8, 5, 6].collect { |i| {"message" => "test#{i}"} }, + events.collect { |event| event[2] }) end data(flat: SINGLE_LINE_CONFIG, @@ -991,21 +964,19 @@ def test_rotate_file_with_write_old_and_no_new_file(data) rotated_file.puts "test8" rotated_file.flush } - assert_equal(4, events.length) - assert_equal({"message" => "test3"}, events[0][2]) - assert_equal({"message" => "test4"}, events[1][2]) - assert_equal({"message" => "test7"}, events[2][2]) - assert_equal({"message" => "test8"}, events[3][2]) + assert_equal([3, 4, 7, 8].collect { |i| {"message" => "test#{i}"} }, + events.collect { |event| event[2] }) end - def sub_test_rotate_file(config = nil, expect_emits: nil, expect_records: nil, timeout: nil) - file = Fluent::FileWrapper.open("#{TMP_DIR}/tail.txt", "wb") + def sub_test_rotate_file(config = nil, expect_emits: nil, expect_records: nil, timeout: 5) + file = Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") file.puts "test1" file.puts "test2" file.flush d = create_driver(config) d.run(expect_emits: expect_emits, expect_records: expect_records, timeout: timeout) do + sleep(0.1) while d.instance.instance_variable_get(:@startup) size = d.emit_count file.puts "test3" file.puts "test4" @@ -1015,18 +986,18 @@ def sub_test_rotate_file(config = nil, expect_emits: nil, expect_records: nil, t if Fluent.windows? file.close - FileUtils.mv("#{TMP_DIR}/tail.txt", "#{TMP_DIR}/tail2.txt", force: true) - file = File.open("#{TMP_DIR}/tail.txt", "ab") + FileUtils.mv("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail2.txt", force: true) + file = Fluent::FileWrapper.open("#{@tmp_dir}/tail2.txt", "ab") else - FileUtils.mv("#{TMP_DIR}/tail.txt", "#{TMP_DIR}/tail2.txt") + FileUtils.mv("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail2.txt") end if block_given? yield file else - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } sleep 1 - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test5" f.puts "test6" } @@ -1040,10 +1011,8 @@ def sub_test_rotate_file(config = nil, expect_emits: nil, expect_records: nil, t end def test_truncate_file - omit "Permission denied error happen on Windows. Need fix" if Fluent.windows? - config = SINGLE_LINE_CONFIG - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" f.flush @@ -1052,12 +1021,18 @@ def test_truncate_file d = create_driver(config) d.run(expect_emits: 2) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\ntest4" f.flush } waiting(2) { sleep 0.1 until d.events.length == 2 } - File.truncate("#{TMP_DIR}/tail.txt", 6) + if Fluent.windows? + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| + f.puts("test1"); + } + else + File.truncate("#{@tmp_dir}/tail.txt", 6) + end end expected = { @@ -1076,10 +1051,8 @@ def test_truncate_file end def test_move_truncate_move_back - omit "Permission denied error happen on Windows. Need fix" if Fluent.windows? - config = SINGLE_LINE_CONFIG - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -1088,17 +1061,23 @@ def test_move_truncate_move_back d.run(expect_emits: 1) do if Fluent.windows? - FileUtils.mv("#{TMP_DIR}/tail.txt", "#{TMP_DIR}/tail2.txt", force: true) + FileUtils.mv("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail2.txt", force: true) else - FileUtils.mv("#{TMP_DIR}/tail.txt", "#{TMP_DIR}/tail2.txt") + FileUtils.mv("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail2.txt") end sleep(1) - File.truncate("#{TMP_DIR}/tail2.txt", 6) + if Fluent.windows? + Fluent::FileWrapper.open("#{@tmp_dir}/tail2.txt", "wb") { |f| + f.puts("test1"); + } + else + File.truncate("#{@tmp_dir}/tail2.txt", 6) + end sleep(1) if Fluent.windows? - FileUtils.mv("#{TMP_DIR}/tail2.txt", "#{TMP_DIR}/tail.txt", force: true) + FileUtils.mv("#{@tmp_dir}/tail2.txt", "#{@tmp_dir}/tail.txt", force: true) else - FileUtils.mv("#{TMP_DIR}/tail2.txt", "#{TMP_DIR}/tail.txt") + FileUtils.mv("#{@tmp_dir}/tail2.txt", "#{@tmp_dir}/tail.txt") end end @@ -1110,17 +1089,17 @@ def test_move_truncate_move_back end def test_lf - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| } + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| } d = create_driver d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.print "test3" } sleep 1 - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test4" } end @@ -1131,12 +1110,12 @@ def test_lf end def test_whitespace - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| } + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| } d = create_driver d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts " " # 4 spaces f.puts " 4 spaces" f.puts "4 spaces " @@ -1167,7 +1146,7 @@ def test_encoding(data) d = create_driver(CONFIG_READ_FROM_HEAD + encoding_config) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test" } end @@ -1189,7 +1168,7 @@ def test_from_encoding utf8_message = cp932_message.encode(Encoding::UTF_8) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "w:cp932") {|f| + File.open("#{@tmp_dir}/tail.txt", "w:cp932") {|f| f.puts cp932_message } end @@ -1212,7 +1191,7 @@ def test_from_encoding_utf16 utf8_message = utf16_message.encode(Encoding::UTF_8).strip d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "w:utf-16le") { |f| + File.open("#{@tmp_dir}/tail.txt", "w:utf-16le") { |f| f.write utf16_message } end @@ -1233,7 +1212,7 @@ def test_encoding_with_bad_character d = create_driver(conf) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "w") { |f| + File.open("#{@tmp_dir}/tail.txt", "w") { |f| f.write "te\x86st\n" } end @@ -1248,11 +1227,11 @@ def test_encoding_with_bad_character parse: PARSE_MULTILINE_CONFIG) def test_multiline(data) config = data - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1276,11 +1255,11 @@ def test_multiline(data) parse: PARSE_MULTILINE_CONFIG) def test_multiline_with_emit_unmatched_lines_true(data) config = data + config_element("", "", { "emit_unmatched_lines" => true }) - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1306,11 +1285,11 @@ def test_multiline_with_emit_unmatched_lines_true(data) parse: PARSE_MULTILINE_CONFIG_WITH_NEWLINE) def test_multiline_with_emit_unmatched_lines2(data) config = data + config_element("", "", { "emit_unmatched_lines" => true }) - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 0, timeout: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "s test0" f.puts "f test1" f.puts "f test2" @@ -1332,7 +1311,7 @@ def test_multiline_with_emit_unmatched_lines2(data) data(flat: MULTILINE_CONFIG, parse: PARSE_MULTILINE_CONFIG) def test_multiline_with_flush_interval(data) - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = data + config_element("", "", { "multiline_flush_interval" => "2s" }) d = create_driver(config) @@ -1340,7 +1319,7 @@ def test_multiline_with_flush_interval(data) assert_equal(2, d.instance.multiline_flush_interval) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1375,7 +1354,7 @@ def test_multiline_encoding_of_flushed_record(data) d = create_driver(config + encoding_config) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| + File.open("#{@tmp_dir}/tail.txt", "wb") { |f| f.puts "s test" } end @@ -1398,7 +1377,7 @@ def test_multiline_from_encoding_of_flushed_record cp932_message = "s \x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932) utf8_message = "\x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".encode(Encoding::UTF_8, Encoding::CP932) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "w:cp932") { |f| + File.open("#{@tmp_dir}/tail.txt", "w:cp932") { |f| f.puts cp932_message } end @@ -1430,11 +1409,11 @@ def test_multiline_from_encoding_of_flushed_record ) def test_multiline_with_multiple_formats(data) config = data - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -1470,7 +1449,7 @@ def test_multiline_with_multiple_formats(data) ]) ) def test_multilinelog_with_multiple_paths(data) - files = ["#{TMP_DIR}/tail1.txt", "#{TMP_DIR}/tail2.txt"] + files = ["#{@tmp_dir}/tail1.txt", "#{@tmp_dir}/tail2.txt"] files.each { |file| File.open(file, "wb") { |f| } } config = data + config_element("", "", { @@ -1515,12 +1494,12 @@ def test_multilinelog_with_multiple_paths(data) ]) ) def test_multiline_without_firstline(data) - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = data d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "foo 1" f.puts "bar 1" f.puts "baz 1" @@ -1540,33 +1519,20 @@ def test_multiline_without_firstline(data) sub_test_case "path" do # * path test # TODO: Clean up tests - EX_ROTATE_WAIT = 0 - EX_FOLLOW_INODES = false - - EX_CONFIG = config_element("", "", { - "tag" => "tail", - "path" => "test/plugin/*/%Y/%m/%Y%m%d-%H%M%S.log,test/plugin/data/log/**/*.log", - "format" => "none", - "pos_file" => "#{TMP_DIR}/tail.pos", - "read_from_head" => true, - "refresh_interval" => 30, - "rotate_wait" => "#{EX_ROTATE_WAIT}s", - "follow_inodes" => "#{EX_FOLLOW_INODES}", - }) def test_expand_paths ex_paths = [ create_target_info('test/plugin/data/2010/01/20100102-030405.log'), create_target_info('test/plugin/data/log/foo/bar.log'), create_target_info('test/plugin/data/log/test.log') ] - plugin = create_driver(EX_CONFIG, false).instance + plugin = create_driver(ex_config, false).instance flexstub(Time) do |timeclass| timeclass.should_receive(:now).with_no_args.and_return(Time.new(2010, 1, 2, 3, 4, 5)) assert_equal(ex_paths, plugin.expand_paths.values.sort_by { |path_ino| path_ino.path }) end # Test exclusion - exclude_config = EX_CONFIG + config_element("", "", { "exclude_path" => %Q(["#{ex_paths.last.path}"]) }) + exclude_config = ex_config + config_element("", "", { "exclude_path" => %Q(["#{ex_paths.last.path}"]) }) plugin = create_driver(exclude_config, false).instance assert_equal(ex_paths - [ex_paths.last], plugin.expand_paths.values.sort_by { |path_ino| path_ino.path }) end @@ -1576,9 +1542,9 @@ def test_expand_paths_with_duplicate_configuration create_target_info('test/plugin/data/log/foo/bar.log'), create_target_info('test/plugin/data/log/test.log') ] - duplicate_config = EX_CONFIG.dup + duplicate_config = ex_config.dup duplicate_config["path"]="test/plugin/data/log/**/*.log, test/plugin/data/log/**/*.log" - plugin = create_driver(EX_CONFIG, false).instance + plugin = create_driver(ex_config, false).instance assert_equal(expanded_paths, plugin.expand_paths.values.sort_by { |path_ino| path_ino.path }) end @@ -1589,7 +1555,7 @@ def test_expand_paths_with_timezone create_target_info('test/plugin/data/log/test.log') ] ['Asia/Taipei', '+08'].each do |tz_type| - taipei_config = EX_CONFIG + config_element("", "", {"path_timezone" => tz_type}) + taipei_config = ex_config + config_element("", "", {"path_timezone" => tz_type}) plugin = create_driver(taipei_config, false).instance # Test exclude @@ -1620,7 +1586,7 @@ def test_log_file_without_extension "tag" => "tail", "path" => "test/plugin/data/log/**/*", "format" => "none", - "pos_file" => "#{TMP_DIR}/tail.pos" + "pos_file" => "#{@tmp_dir}/tail.pos" }) plugin = create_driver(config, false).instance @@ -1630,20 +1596,20 @@ def test_log_file_without_extension def test_unwatched_files_should_be_removed config = config_element("", "", { "tag" => "tail", - "path" => "#{TMP_DIR}/*.txt", + "path" => "#{@tmp_dir}/*.txt", "format" => "none", - "pos_file" => "#{TMP_DIR}/tail.pos", + "pos_file" => "#{@tmp_dir}/tail.pos", "read_from_head" => true, "refresh_interval" => 1, }) d = create_driver(config, false) d.end_if { d.instance.instance_variable_get(:@tails).keys.size >= 1 } d.run(expect_emits: 1, shutdown: false) do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "test3\n" } + File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "test3\n" } end - cleanup_directory(TMP_DIR) - waiting(20) { sleep 0.1 until Dir.glob("#{TMP_DIR}/*.txt").size == 0 } # Ensure file is deleted on Windows + cleanup_directory(@tmp_dir) + waiting(20) { sleep 0.1 until Dir.glob("#{@tmp_dir}/*.txt").size == 0 } # Ensure file is deleted on Windows waiting(5) { sleep 0.1 until d.instance.instance_variable_get(:@tails).keys.size <= 0 } assert_equal(0, d.instance.instance_variable_get(:@tails).keys.size) @@ -1706,47 +1672,47 @@ def capability_enabled? def test_pos_file_dir_creation config = config_element("", "", { "tag" => "tail", - "path" => "#{TMP_DIR}/*.txt", + "path" => "#{@tmp_dir}/*.txt", "format" => "none", - "pos_file" => "#{TMP_DIR}/pos/tail.pos", + "pos_file" => "#{@tmp_dir}/pos/tail.pos", "read_from_head" => true, "refresh_interval" => 1 }) - assert_path_not_exist("#{TMP_DIR}/pos") + assert_path_not_exist("#{@tmp_dir}/pos") d = create_driver(config, false) d.run - assert_path_exist("#{TMP_DIR}/pos") - assert_equal('755', File.stat("#{TMP_DIR}/pos").mode.to_s(8)[-3, 3]) + assert_path_exist("#{@tmp_dir}/pos") + assert_equal('755', File.stat("#{@tmp_dir}/pos").mode.to_s(8)[-3, 3]) ensure - cleanup_directory(TMP_DIR) + cleanup_directory(@tmp_dir) end def test_pos_file_dir_creation_with_system_dir_permission config = config_element("", "", { "tag" => "tail", - "path" => "#{TMP_DIR}/*.txt", + "path" => "#{@tmp_dir}/*.txt", "format" => "none", - "pos_file" => "#{TMP_DIR}/pos/tail.pos", + "pos_file" => "#{@tmp_dir}/pos/tail.pos", "read_from_head" => true, "refresh_interval" => 1 }) - assert_path_not_exist("#{TMP_DIR}/pos") + assert_path_not_exist("#{@tmp_dir}/pos") Fluent::SystemConfig.overwrite_system_config({ "dir_permission" => "744" }) do d = create_driver(config, false) d.run end - assert_path_exist("#{TMP_DIR}/pos") + assert_path_exist("#{@tmp_dir}/pos") if Fluent.windows? - assert_equal('755', File.stat("#{TMP_DIR}/pos").mode.to_s(8)[-3, 3]) + assert_equal('755', File.stat("#{@tmp_dir}/pos").mode.to_s(8)[-3, 3]) else - assert_equal('744', File.stat("#{TMP_DIR}/pos").mode.to_s(8)[-3, 3]) + assert_equal('744', File.stat("#{@tmp_dir}/pos").mode.to_s(8)[-3, 3]) end ensure - cleanup_directory(TMP_DIR) + cleanup_directory(@tmp_dir) end def test_z_refresh_watchers @@ -1755,7 +1721,7 @@ def test_z_refresh_watchers create_target_info('test/plugin/data/log/foo/bar.log'), create_target_info('test/plugin/data/log/test.log'), ] - plugin = create_driver(EX_CONFIG, false).instance + plugin = create_driver(ex_config, false).instance sio = StringIO.new plugin.instance_eval do @pf = Fluent::Plugin::TailInput::PositionFile.load(sio, EX_FOLLOW_INODES, {}, logger: $log) @@ -1798,9 +1764,9 @@ def test_z_refresh_watchers test 'type of pos_file_compaction_interval is time' do tail = { "tag" => "tail", - "path" => "#{TMP_DIR}/*.txt", + "path" => "#{@tmp_dir}/*.txt", "format" => "none", - "pos_file" => "#{TMP_DIR}/pos/tail.pos", + "pos_file" => "#{@tmp_dir}/pos/tail.pos", "refresh_interval" => 1, "read_from_head" => true, 'pos_file_compaction_interval' => '24h', @@ -1817,7 +1783,7 @@ def test_z_refresh_watchers DummyWatcher = Struct.new("DummyWatcher", :tag) def test_tag - d = create_driver(EX_CONFIG, false) + d = create_driver(ex_config, false) d.run {} plugin = d.instance mock(plugin.router).emit_stream('tail', anything).once @@ -1901,13 +1867,13 @@ def test_tag_prefix_and_suffix_ignore test 'max_line_size' do |(label, size)| config = config_element("", "", { "tag" => "max_line_size", - "path" => "#{TMP_DIR}/with_long_lines.txt", + "path" => "#{@tmp_dir}/with_long_lines.txt", "format" => "none", "read_from_head" => true, "max_line_size" => label, "log_level" => "debug" }) - File.open("#{TMP_DIR}/with_long_lines.txt", "w+") do |f| + File.open("#{@tmp_dir}/with_long_lines.txt", "w+") do |f| f.puts "foo" f.puts "x" * size # 'x' * size + \n > @max_line_size f.puts "bar" @@ -1933,7 +1899,7 @@ def test_tag_prefix_and_suffix_ignore # Ensure that no fatal exception is raised when a file is missing and that # files that do exist are still tailed as expected. def test_missing_file - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -1942,16 +1908,16 @@ def test_missing_file # since their interactions with the filesystem differ. config1 = config_element("", "", { "tag" => "t1", - "path" => "#{TMP_DIR}/non_existent_file.txt,#{TMP_DIR}/tail.txt", + "path" => "#{@tmp_dir}/non_existent_file.txt,#{@tmp_dir}/tail.txt", "format" => "none", "rotate_wait" => "2s", - "pos_file" => "#{TMP_DIR}/tail.pos" + "pos_file" => "#{@tmp_dir}/tail.pos" }) config2 = config1 + config_element("", "", { "read_from_head" => true }) [config1, config2].each do |config| d = create_driver(config, false) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -1967,19 +1933,19 @@ def test_missing_file sub_test_case 'inode_processing' do def test_should_delete_file_pos_entry_for_non_existing_file_with_follow_inodes - config = COMMON_FOLLOW_INODE_CONFIG + config = common_follow_inode_config - path = "#{TMP_DIR}/tail.txt" + path = "#{@tmp_dir}/tail.txt" ino = 1 pos = 1234 - File.open("#{TMP_DIR}/tail.pos", "wb") {|f| + File.open("#{@tmp_dir}/tail.pos", "wb") {|f| f.puts ("%s\t%016x\t%016x\n" % [path, pos, ino]) } d = create_driver(config, false) d.run - pos_file = File.open("#{TMP_DIR}/tail.pos", "r") + pos_file = File.open("#{@tmp_dir}/tail.pos", "r") pos_file.pos = 0 assert_raise(EOFError) do @@ -1988,21 +1954,21 @@ def test_should_delete_file_pos_entry_for_non_existing_file_with_follow_inodes end def test_should_write_latest_offset_after_rotate_wait - config = COMMON_FOLLOW_INODE_CONFIG - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + config = common_follow_inode_config + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d = create_driver(config, false) d.run(expect_emits: 2, shutdown: false) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3\n"} - FileUtils.move("#{TMP_DIR}/tail.txt", "#{TMP_DIR}/tail.txt" + "1") + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1") sleep 1 - File.open("#{TMP_DIR}/tail.txt" + "1", "ab") {|f| f.puts "test4\n"} + File.open("#{@tmp_dir}/tail.txt" + "1", "ab") {|f| f.puts "test4\n"} end - pos_file = File.open("#{TMP_DIR}/tail.pos", "r") + pos_file = File.open("#{@tmp_dir}/tail.pos", "r") pos_file.pos = 0 line_parts = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(pos_file.readline) waiting(5) { @@ -2019,16 +1985,16 @@ def test_should_write_latest_offset_after_rotate_wait def test_should_remove_deleted_file config = config_element("", "", {"format" => "none"}) - path = "#{TMP_DIR}/tail.txt" + path = "#{@tmp_dir}/tail.txt" ino = 1 pos = 1234 - File.open("#{TMP_DIR}/tail.pos", "wb") {|f| + File.open("#{@tmp_dir}/tail.pos", "wb") {|f| f.puts ("%s\t%016x\t%016x\n" % [path, pos, ino]) } d = create_driver(config) d.run do - pos_file = File.open("#{TMP_DIR}/tail.pos", "r") + pos_file = File.open("#{@tmp_dir}/tail.pos", "r") pos_file.pos = 0 assert_equal([], pos_file.readlines) end @@ -2036,8 +2002,8 @@ def test_should_remove_deleted_file def test_should_mark_file_unwatched_after_limit_recently_modified_and_rotate_wait config = config_element("ROOT", "", { - "path" => "#{TMP_DIR}/tail.txt*", - "pos_file" => "#{TMP_DIR}/tail.pos", + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", "tag" => "t1", "rotate_wait" => "1s", "refresh_interval" => "1s", @@ -2049,14 +2015,14 @@ def test_should_mark_file_unwatched_after_limit_recently_modified_and_rotate_wai d = create_driver(config, false) - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } - target_info = create_target_info("#{TMP_DIR}/tail.txt") + target_info = create_target_info("#{@tmp_dir}/tail.txt") d.run(expect_emits: 1, shutdown: false) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3\n"} + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} end @@ -2074,8 +2040,8 @@ def test_should_mark_file_unwatched_after_limit_recently_modified_and_rotate_wai def test_should_read_from_head_on_file_renaming_with_star_in_pattern config = config_element("ROOT", "", { - "path" => "#{TMP_DIR}/tail.txt*", - "pos_file" => "#{TMP_DIR}/tail.pos", + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", "tag" => "t1", "rotate_wait" => "10s", "refresh_interval" => "1s", @@ -2087,14 +2053,14 @@ def test_should_read_from_head_on_file_renaming_with_star_in_pattern d = create_driver(config, false) - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d.run(expect_emits: 2, shutdown: false) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3\n"} - FileUtils.move("#{TMP_DIR}/tail.txt", "#{TMP_DIR}/tail.txt1") + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1") end events = d.events @@ -2103,20 +2069,20 @@ def test_should_read_from_head_on_file_renaming_with_star_in_pattern end def test_should_not_read_from_head_on_rotation_when_watching_inodes - config = COMMON_FOLLOW_INODE_CONFIG + config = common_follow_inode_config d = create_driver(config, false) - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d.run(expect_emits: 1, shutdown: false) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3\n"} + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} end - FileUtils.move("#{TMP_DIR}/tail.txt", "#{TMP_DIR}/tail.txt1") + FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1") Timecop.travel(Time.now + 10) do sleep 2 events = d.events @@ -2127,23 +2093,23 @@ def test_should_not_read_from_head_on_rotation_when_watching_inodes end def test_should_mark_file_unwatched_if_same_name_file_created_with_different_inode - config = COMMON_FOLLOW_INODE_CONFIG + config = common_follow_inode_config d = create_driver(config, false) - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } - target_info = create_target_info("#{TMP_DIR}/tail.txt") + target_info = create_target_info("#{@tmp_dir}/tail.txt") d.run(expect_emits: 2, shutdown: false) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3\n"} - cleanup_file("#{TMP_DIR}/tail.txt") - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| f.puts "test4\n"} + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} + cleanup_file("#{@tmp_dir}/tail.txt") + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test4\n"} end - new_target_info = create_target_info("#{TMP_DIR}/tail.txt") + new_target_info = create_target_info("#{@tmp_dir}/tail.txt") pos_file = d.instance.instance_variable_get(:@pf) @@ -2159,7 +2125,7 @@ def test_should_mark_file_unwatched_if_same_name_file_created_with_different_ino def test_should_close_watcher_after_rotate_wait now = Time.now - config = COMMON_FOLLOW_INODE_CONFIG + config_element('', '', {"rotate_wait" => "1s", "limit_recently_modified" => "1s"}) + config = common_follow_inode_config + config_element('', '', {"rotate_wait" => "1s", "limit_recently_modified" => "1s"}) d = create_driver(config, false) d.instance.instance_eval do @@ -2172,11 +2138,11 @@ def test_should_close_watcher_after_rotate_wait @metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics) end - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } - target_info = create_target_info("#{TMP_DIR}/tail.txt") + target_info = create_target_info("#{@tmp_dir}/tail.txt") mock.proxy(Fluent::Plugin::TailInput::TailWatcher).new(target_info, anything, anything, true, true, anything, nil, anything, anything).once d.run(shutdown: false) assert d.instance.instance_variable_get(:@tails)[target_info.path] @@ -2192,26 +2158,26 @@ def test_should_close_watcher_after_rotate_wait def test_should_create_new_watcher_for_new_file_with_same_name now = Time.now - config = COMMON_FOLLOW_INODE_CONFIG + config_element('', '', {"limit_recently_modified" => "2s"}) + config = common_follow_inode_config + config_element('', '', {"limit_recently_modified" => "2s"}) d = create_driver(config, false) - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } - path_ino = create_target_info("#{TMP_DIR}/tail.txt") + path_ino = create_target_info("#{@tmp_dir}/tail.txt") d.run(expect_emits: 1, shutdown: false) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3\n"} + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} end - cleanup_file("#{TMP_DIR}/tail.txt") - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + cleanup_file("#{@tmp_dir}/tail.txt") + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test3" f.puts "test4" } - new_path_ino = create_target_info("#{TMP_DIR}/tail.txt") + new_path_ino = create_target_info("#{@tmp_dir}/tail.txt") Timecop.travel(now + 10) do sleep 3 @@ -2229,19 +2195,19 @@ def test_should_create_new_watcher_for_new_file_with_same_name end def test_truncate_file_with_follow_inodes - config = COMMON_FOLLOW_INODE_CONFIG + config = common_follow_inode_config d = create_driver(config, false) - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } d.run(expect_emits: 3, shutdown: false) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3\n"} + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3\n"} sleep 2 - File.open("#{TMP_DIR}/tail.txt", "w+b") {|f| f.puts "test4\n"} + File.open("#{@tmp_dir}/tail.txt", "w+b") {|f| f.puts "test4\n"} end events = d.events @@ -2255,15 +2221,15 @@ def test_truncate_file_with_follow_inodes # issue #3464 def test_should_replace_target_info - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1\n" } - target_info = create_target_info("#{TMP_DIR}/tail.txt") + target_info = create_target_info("#{@tmp_dir}/tail.txt") inodes = [] config = config_element("ROOT", "", { - "path" => "#{TMP_DIR}/tail.txt*", - "pos_file" => "#{TMP_DIR}/tail.pos", + "path" => "#{@tmp_dir}/tail.txt*", + "pos_file" => "#{@tmp_dir}/tail.pos", "tag" => "t1", "refresh_interval" => "60s", "read_from_head" => "true", @@ -2281,8 +2247,8 @@ def test_should_replace_target_info end assert_equal([target_info.ino], inodes) - cleanup_file("#{TMP_DIR}/tail.txt") - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| f.puts "test2\n"} + cleanup_file("#{@tmp_dir}/tail.txt") + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test2\n"} while d.events.size < 2 do sleep 0.1 @@ -2290,7 +2256,7 @@ def test_should_replace_target_info inodes = d.instance.instance_variable_get(:@tails).values.collect do |tw| tw.ino end - new_target_info = create_target_info("#{TMP_DIR}/tail.txt") + new_target_info = create_target_info("#{@tmp_dir}/tail.txt") assert_not_equal(target_info.ino, new_target_info.ino) assert_equal([new_target_info.ino], inodes) end @@ -2299,7 +2265,7 @@ def test_should_replace_target_info sub_test_case "tail_path" do def test_tail_path_with_singleline - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + File.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "test1" f.puts "test2" } @@ -2307,7 +2273,7 @@ def test_tail_path_with_singleline d = create_driver(SINGLE_LINE_CONFIG + config_element("", "", { "path_key" => "path" })) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + File.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } @@ -2316,12 +2282,12 @@ def test_tail_path_with_singleline events = d.events assert_equal(true, events.length > 0) events.each do |emit| - assert_equal("#{TMP_DIR}/tail.txt", emit[2]["path"]) + assert_equal("#{@tmp_dir}/tail.txt", emit[2]["path"]) end end def test_tail_path_with_multiline_with_firstline - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = config_element("", "", { "path_key" => "path", @@ -2331,7 +2297,7 @@ def test_tail_path_with_multiline_with_firstline }) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" f.puts "f test3" @@ -2346,12 +2312,12 @@ def test_tail_path_with_multiline_with_firstline events = d.events assert_equal(4, events.length) events.each do |emit| - assert_equal("#{TMP_DIR}/tail.txt", emit[2]["path"]) + assert_equal("#{@tmp_dir}/tail.txt", emit[2]["path"]) end end def test_tail_path_with_multiline_without_firstline - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + File.open("#{@tmp_dir}/tail.txt", "wb") { |f| } config = config_element("", "", { "path_key" => "path", @@ -2362,7 +2328,7 @@ def test_tail_path_with_multiline_without_firstline }) d = create_driver(config) d.run(expect_emits: 1) do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + File.open("#{@tmp_dir}/tail.txt", "ab") { |f| f.puts "foo 1" f.puts "bar 1" f.puts "baz 1" @@ -2372,7 +2338,7 @@ def test_tail_path_with_multiline_without_firstline events = d.events assert(events.length > 0) events.each do |emit| - assert_equal("#{TMP_DIR}/tail.txt", emit[2]["path"]) + assert_equal("#{@tmp_dir}/tail.txt", emit[2]["path"]) end end @@ -2380,7 +2346,7 @@ def test_tail_path_with_multiline_with_multiple_paths if ENV["APPVEYOR"] && Fluent.windows? omit "This testcase is unstable on AppVeyor." end - files = ["#{TMP_DIR}/tail1.txt", "#{TMP_DIR}/tail2.txt"] + files = ["#{@tmp_dir}/tail1.txt", "#{@tmp_dir}/tail2.txt"] files.each { |file| File.open(file, "wb") { |f| } } config = config_element("", "", { @@ -2414,20 +2380,20 @@ def test_tail_path_with_multiline_with_multiple_paths def test_limit_recently_modified now = Time.new(2010, 1, 2, 3, 4, 5) - FileUtils.touch("#{TMP_DIR}/tail_unwatch.txt", mtime: (now - 3601)) - FileUtils.touch("#{TMP_DIR}/tail_watch1.txt", mtime: (now - 3600)) - FileUtils.touch("#{TMP_DIR}/tail_watch2.txt", mtime: now) + FileUtils.touch("#{@tmp_dir}/tail_unwatch.txt", mtime: (now - 3601)) + FileUtils.touch("#{@tmp_dir}/tail_watch1.txt", mtime: (now - 3600)) + FileUtils.touch("#{@tmp_dir}/tail_watch2.txt", mtime: now) config = config_element('', '', { 'tag' => 'tail', - 'path' => "#{TMP_DIR}/*.txt", + 'path' => "#{@tmp_dir}/*.txt", 'format' => 'none', 'limit_recently_modified' => '3600s' }) expected_files = [ - create_target_info("#{TMP_DIR}/tail_watch1.txt"), - create_target_info("#{TMP_DIR}/tail_watch2.txt") + create_target_info("#{@tmp_dir}/tail_watch1.txt"), + create_target_info("#{@tmp_dir}/tail_watch2.txt") ] Timecop.freeze(now) do @@ -2437,7 +2403,7 @@ def test_limit_recently_modified end def test_skip_refresh_on_startup - FileUtils.touch("#{TMP_DIR}/tail.txt") + FileUtils.touch("#{@tmp_dir}/tail.txt") config = config_element('', '', { 'format' => 'none', 'refresh_interval' => 1, @@ -2452,7 +2418,7 @@ def test_skip_refresh_on_startup end def test_ENOENT_error_after_setup_watcher - path = "#{TMP_DIR}/tail.txt" + path = "#{@tmp_dir}/tail.txt" FileUtils.touch(path) config = config_element('', '', { 'format' => 'none', @@ -2477,10 +2443,10 @@ def test_ENOENT_error_after_setup_watcher def test_EACCES_error_after_setup_watcher omit "Cannot test with root user" if Process::UID.eid == 0 - path = "#{TMP_DIR}/noaccess/tail.txt" + path = "#{@tmp_dir}/noaccess/tail.txt" begin - FileUtils.mkdir_p("#{TMP_DIR}/noaccess") - FileUtils.chmod(0755, "#{TMP_DIR}/noaccess") + FileUtils.mkdir_p("#{@tmp_dir}/noaccess") + FileUtils.chmod(0755, "#{@tmp_dir}/noaccess") FileUtils.touch(path) config = config_element('', '', { 'tag' => "tail", @@ -2489,7 +2455,7 @@ def test_EACCES_error_after_setup_watcher }) d = create_driver(config, false) mock.proxy(d.instance).existence_path do |hash| - FileUtils.chmod(0000, "#{TMP_DIR}/noaccess") + FileUtils.chmod(0000, "#{@tmp_dir}/noaccess") hash end.twice assert_nothing_raised do @@ -2500,14 +2466,14 @@ def test_EACCES_error_after_setup_watcher end ensure d.instance_shutdown if d && d.instance - if File.exist?("#{TMP_DIR}/noaccess") - FileUtils.chmod(0755, "#{TMP_DIR}/noaccess") - FileUtils.rm_rf("#{TMP_DIR}/noaccess") + if File.exist?("#{@tmp_dir}/noaccess") + FileUtils.chmod(0755, "#{@tmp_dir}/noaccess") + FileUtils.rm_rf("#{@tmp_dir}/noaccess") end end unless Fluent.windows? def test_EACCES - path = "#{TMP_DIR}/tail.txt" + path = "#{@tmp_dir}/tail.txt" FileUtils.touch(path) config = config_element('', '', { 'format' => 'none', @@ -2525,7 +2491,7 @@ def test_EACCES end def test_shutdown_timeout - File.open("#{TMP_DIR}/tail.txt", "wb") do |f| + File.open("#{@tmp_dir}/tail.txt", "wb") do |f| (1024 * 1024 * 5).times do f.puts "{\"test\":\"fizzbuzz\"}" end @@ -2558,7 +2524,7 @@ def test_shutdown_timeout def test_lines_collected_with_no_throttling(data) file, num_lines, msg = data - pattern = "/^#{TMP_DIR}\/(?.+)\.log$/" + pattern = "/^#{@tmp_dir}\/(?.+)\.log$/" rule = create_rule_directive({ "file" => "/test.*/", }, -1) @@ -2567,7 +2533,7 @@ def test_lines_collected_with_no_throttling(data) conf = ROOT_CONFIG + group + path_element + CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG - File.open("#{TMP_DIR}/#{file}", 'wb') do |f| + File.open("#{@tmp_dir}/#{file}", 'wb') do |f| num_lines.times do f.puts "#{msg}\n" end @@ -2608,10 +2574,10 @@ def test_lines_collected_with_no_throttling(data) "podname"=> "/podname.+/", }, limit) path_element = create_path_element(file) - conf = ROOT_CONFIG + create_group_directive(TAILING_GROUP_PATTERN, "#{rate_period}s", rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD + conf = ROOT_CONFIG + create_group_directive(tailing_group_pattern, "#{rate_period}s", rule) + path_element + SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD d = create_driver(conf, false) - file_path = "#{TMP_DIR}/#{file}" + file_path = "#{@tmp_dir}/#{file}" File.open(file_path, 'wb') do |f| num_lines.times do