diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index e294a2e379..5cadf94984 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -64,31 +64,29 @@ def configure(conf) end def multi_workers_ready? - ### TODO: add hack to synchronize for multi workers true end def write(chunk) path_without_suffix = extract_placeholders(@path_without_suffix, chunk) - path = generate_path(path_without_suffix) - FileUtils.mkdir_p File.dirname(path), mode: @dir_perm - - case @compress - when :text - File.open(path, "ab", @file_perm) {|f| - f.flock(File::LOCK_EX) - chunk.write_to(f) - } - when :gzip - File.open(path, "ab", @file_perm) {|f| - f.flock(File::LOCK_EX) - gz = Zlib::GzipWriter.new(f) - chunk.write_to(gz) - gz.close - } + generate_path(path_without_suffix) do |path| + FileUtils.mkdir_p File.dirname(path), mode: @dir_perm + + case @compress + when :text + File.open(path, "ab", @file_perm) {|f| + f.flock(File::LOCK_EX) + chunk.write_to(f) + } + when :gzip + File.open(path, "ab", @file_perm) {|f| + f.flock(File::LOCK_EX) + gz = Zlib::GzipWriter.new(f) + chunk.write_to(gz) + gz.close + } + end end - - path end private @@ -117,15 +115,34 @@ def has_time_format?(str) def generate_path(path_without_suffix) if @append - "#{path_without_suffix}#{@suffix}" - else + path = "#{path_without_suffix}#{@suffix}" + synchronize_path(path) do + yield path + end + return path + end + + begin i = 0 loop do path = "#{path_without_suffix}.#{i}#{@suffix}" - return path unless File.exist?(path) + break unless File.exist?(path) i += 1 end + synchronize_path(path) do + # If multiple processes or threads select the same path and another + # one entered this locking block first, the file should already + # exist and this one should retry to find new path. + raise FileAlreadyExist if File.exist?(path) + yield path + end + rescue FileAlreadyExist + retry end + path + end + + class FileAlreadyExist < StandardError end end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 5dd5255652..690033eee4 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -198,6 +198,7 @@ def rollback_count def initialize super @counter_mutex = Mutex.new + @flush_thread_mutex = Mutex.new @buffering = false @delayed_commit = false @as_secondary = false @@ -597,6 +598,42 @@ def terminate super end + def actual_flush_thread_count + return 0 unless @buffering + return @buffer_config.flush_thread_count unless @as_secondary + @primary_instance.buffer_config.flush_thread_count + end + + # Ensures `path` (filename or filepath) processable + # only by the current thread in the current process. + # For multiple workers, the lock is shared if `path` is the same value. + # For multiple threads, the lock is shared by all threads in the same process. + def synchronize_path(path) + synchronize_path_in_workers(path) do + synchronize_in_threads do + yield + end + end + end + + def synchronize_path_in_workers(path) + need_worker_lock = system_config.workers > 1 + if need_worker_lock + acquire_worker_lock(path) { yield } + else + yield + end + end + + def synchronize_in_threads + need_thread_lock = actual_flush_thread_count > 1 + if need_thread_lock + @flush_thread_mutex.synchronize { yield } + else + yield + end + end + def support_in_v12_style?(feature) # for plugins written in v0.12 styles case feature diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index a04b19d469..7bbd5cb5a9 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -1072,4 +1072,263 @@ def invoke_slow_flush_log_threshold_test(i) } end end + + sub_test_case "actual_flush_thread_count" do + data( + "Not buffered", + { + output_type: :sync, + config: config_element(), + expected: 0, + } + ) + data( + "Buffered with singile thread", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {})]), + expected: 1, + } + ) + data( + "Buffered with multiple threads", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {"flush_thread_count" => 8})]), + expected: 8, + } + ) + test "actual_flush_thread_count" do |data| + o = create_output(data[:output_type]) + o.configure(data[:config]) + assert_equal data[:expected], o.actual_flush_thread_count + end + + data( + "Buffered with single thread", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + expected: 1, + } + ) + data( + "Buffered with multiple threads", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {"flush_thread_count" => 8}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + expected: 8, + } + ) + test "actual_flush_thread_count for secondary" do |data| + primary = create_output(data[:output_type]) + primary.configure(data[:config]) + assert_equal data[:expected], primary.secondary.actual_flush_thread_count + end + end + + sub_test_case "synchronize_path" do + def setup + Dir.mktmpdir do |lock_dir| + ENV['FLUENTD_LOCK_DIR'] = lock_dir + yield + end + end + + def assert_worker_lock(lock_path, expect_locked) + # With LOCK_NB set, flock() returns: + # * `false` when the file is already locked. + # * `0` when the file is not locked. + File.open(lock_path, "w") do |f| + if expect_locked + assert_equal false, f.flock(File::LOCK_EX|File::LOCK_NB) + else + assert_equal 0, f.flock(File::LOCK_EX|File::LOCK_NB) + end + end + end + + def assert_thread_lock(output_plugin, expect_locked) + t = Thread.new do + output_plugin.synchronize_path("test") do + end + end + if expect_locked + assert_nil t.join(3) + else + assert_not_nil t.join(3) + end + end + + data( + "Not buffered with single worker", + { + output_type: :sync, + config: config_element(), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: false, + } + ) + data( + "Not buffered with multiple workers", + { + output_type: :sync, + config: config_element(), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: false, + } + ) + data( + "Buffered with single thread and single worker", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {})]), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: false, + } + ) + data( + "Buffered with multiple threads and single worker", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {"flush_thread_count" => 8})]), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: true, + } + ) + data( + "Buffered with single thread and multiple workers", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {})]), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: false, + } + ) + data( + "Buffered with multiple threads and multiple workers", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {"flush_thread_count" => 8})]), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: true, + } + ) + test "synchronize_path" do |data| + o = create_output(data[:output_type]) + o.configure(data[:config]) + o.system_config_override(workers: data[:workers]) + + test_lock_name = "test_lock_name" + lock_path = o.get_lock_path(test_lock_name) + + o.synchronize_path(test_lock_name) do + assert_worker_lock(lock_path, data[:expect_worker_lock]) + assert_thread_lock(o, data[:expect_thread_lock]) + end + + assert_worker_lock(lock_path, false) + assert_thread_lock(o, false) + end + + data( + "Buffered with single thread and single worker", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: false, + } + ) + data( + "Buffered with multiple threads and single worker", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {"flush_thread_count" => 8}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: true, + } + ) + data( + "Buffered with single thread and multiple workers", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: false, + } + ) + data( + "Buffered with multiple threads and multiple workers", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {"flush_thread_count" => 8}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: true, + } + ) + test "synchronize_path for secondary" do |data| + primary = create_output(data[:output_type]) + primary.configure(data[:config]) + secondary = primary.secondary + secondary.system_config_override(workers: data[:workers]) + + test_lock_name = "test_lock_name" + lock_path = secondary.get_lock_path(test_lock_name) + + secondary.synchronize_path(test_lock_name) do + assert_worker_lock(lock_path, data[:expect_worker_lock]) + assert_thread_lock(secondary, data[:expect_thread_lock]) + end + + assert_worker_lock(lock_path, false) + assert_thread_lock(secondary, false) + end + end end