From 6acf958c00e7a1e2d3030d7a6ccbd8908f5351d9 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Mon, 6 Mar 2023 20:42:06 +0900 Subject: [PATCH 01/10] Fix race condition of out_secondary_file on multiple workers Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/out_secondary_file.rb | 99 +++++++++++++++++++------ test/command/test_cat.rb | 1 + test/plugin/test_out_secondary_file.rb | 15 ++++ 3 files changed, 93 insertions(+), 22 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index e294a2e379..5859b28a95 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -61,34 +61,40 @@ def configure(conf) @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION @file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION + @need_worker_lock = system_config.workers > 1 + @need_thread_lock = @primary_instance.buffer_config.flush_thread_count > 1 end def multi_workers_ready? - ### TODO: add hack to synchronize for multi workers true end + def start + super + extend WriteLocker + @write_mutex = Mutex.new + end + def write(chunk) path_without_suffix = extract_placeholders(@path_without_suffix, chunk) - path = generate_path(path_without_suffix) - FileUtils.mkdir_p File.dirname(path), mode: @dir_perm - - case @compress - when :text - File.open(path, "ab", @file_perm) {|f| - f.flock(File::LOCK_EX) - chunk.write_to(f) - } - when :gzip - File.open(path, "ab", @file_perm) {|f| - f.flock(File::LOCK_EX) - gz = Zlib::GzipWriter.new(f) - chunk.write_to(gz) - gz.close - } + return generate_path(path_without_suffix) do |path| + FileUtils.mkdir_p File.dirname(path), mode: @dir_perm + + case @compress + when :text + File.open(path, "ab", @file_perm) {|f| + f.flock(File::LOCK_EX) + chunk.write_to(f) + } + when :gzip + File.open(path, "ab", @file_perm) {|f| + f.flock(File::LOCK_EX) + gz = Zlib::GzipWriter.new(f) + chunk.write_to(gz) + gz.close + } + end end - - path end private @@ -117,14 +123,63 @@ def has_time_format?(str) def generate_path(path_without_suffix) if @append - "#{path_without_suffix}#{@suffix}" - else + path = "#{path_without_suffix}#{@suffix}" + lock_if_need(path) do + yield path + end + return path + end + + begin i = 0 loop do path = "#{path_without_suffix}.#{i}#{@suffix}" - return path unless File.exist?(path) + break unless File.exist?(path) i += 1 end + lock_if_need(path) do + # If multiple processes or threads select the same path and another + # one entered this locking block first, the file should already + # exist and this one should retry to find new path. + raise FileAlreadyExist if File.exist?(path) + yield path + end + rescue FileAlreadyExist + retry + end + return path + end + + class FileAlreadyExist < StandardError + end + + module WriteLocker + def lock_if_need(path) + get_worker_lock_if_need(path) do + get_thread_lock_if_need do + yield + end + end + end + + def get_worker_lock_if_need(path) + unless @need_worker_lock + yield + return + end + acquire_worker_lock(path) do + yield + end + end + + def get_thread_lock_if_need + unless @need_thread_lock + yield + return + end + @write_mutex.synchronize do + yield + end end end end diff --git a/test/command/test_cat.rb b/test/command/test_cat.rb index 51c1a20bbb..9e0c6e8d82 100644 --- a/test/command/test_cat.rb +++ b/test/command/test_cat.rb @@ -83,6 +83,7 @@ def test_cat_json sub_test_case "msgpack" do def test_cat_secondary_file d = create_secondary_driver + d.instance_start path = d.instance.write(@chunk) d = create_driver d.run(expect_records: 1) do diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 05091b2853..139e3b7b67 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -150,6 +150,7 @@ def create_chunk(primary, metadata, es) test 'should output compressed file when compress option is gzip' do d = create_driver(CONFIG, @primary) + d.instance_start path = d.instance.write(@chunk) assert_equal "#{TMP_DIR}/out_file_test.0.gz", path @@ -161,6 +162,7 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR}/ basename out_file_test ], @primary) + d.instance_start msgpack_binary = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') @@ -175,6 +177,7 @@ def create_chunk(primary, metadata, es) test 'path should be incremental when append option is false' do d = create_driver(CONFIG, @primary) + d.instance_start packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') 5.times do |i| @@ -186,6 +189,7 @@ def create_chunk(primary, metadata, es) test 'path should be unchanged when append option is true' do d = create_driver(CONFIG + %[append true], @primary) + d.instance_start packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') [*1..5].each do |i| @@ -240,6 +244,7 @@ def create_chunk(primary, metadata, es) test 'normal path when compress option is gzip' do d = create_driver + d.instance_start path = d.instance.write(@c) assert_equal "#{TMP_DIR}/out_file_test.0.gz", path end @@ -249,6 +254,7 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR} basename out_file_test ] + d.instance_start path = d.instance.write(@c) assert_equal "#{TMP_DIR}/out_file_test.0", path end @@ -258,6 +264,7 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR} append true ] + d.instance_start path = d.instance.write(@c) assert_equal "#{TMP_DIR}/dump.bin", path end @@ -267,6 +274,7 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR} basename out_file_chunk_id_${chunk_id} ] + d.instance_start path = d.instance.write(@c) if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).0/ unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate) @@ -321,6 +329,7 @@ def create_chunk(primary, metadata, es) basename cool_${tag} compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata(tag: 'test.dummy') c = create_chunk(primary, m, @es) @@ -337,6 +346,7 @@ def create_chunk(primary, metadata, es) basename cool_${tag[0]}_${tag[1]} compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata(tag: 'test.dummy') c = create_chunk(primary, m, @es) @@ -355,6 +365,7 @@ def create_chunk(primary, metadata, es) basename cool_%Y%m%d%H compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC")) c = create_chunk(primary, m, @es) @@ -373,6 +384,7 @@ def create_chunk(primary, metadata, es) basename cool_%Y%m%d%H compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC")) c = create_chunk(primary, m, @es) @@ -389,6 +401,7 @@ def create_chunk(primary, metadata, es) basename cool_${test1} compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata(variables: { "test1".to_sym => "dummy" }) c = create_chunk(primary, m, @es) @@ -421,6 +434,7 @@ def create_chunk(primary, metadata, es) basename cool_%Y%m%d%H_${tag}_${test1} compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata( timekey: event_time("2011-01-02 13:14:15 UTC"), @@ -443,6 +457,7 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR}/%Y%m%d%H/${tag}/${test1} compress gzip ], primary) + d.instance_start m = primary.buffer.new_metadata( timekey: event_time("2011-01-02 13:14:15 UTC"), From c327b8614fc246fb39c59a5a1da056695ff12590 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 7 Mar 2023 11:44:17 +0900 Subject: [PATCH 02/10] Apply suggestions: Remove needless return Signed-off-by: Daijiro Fukuda Co-authored-by: Takuro Ashie --- lib/fluent/plugin/out_secondary_file.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index 5859b28a95..08f027b8de 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -77,7 +77,7 @@ def start def write(chunk) path_without_suffix = extract_placeholders(@path_without_suffix, chunk) - return generate_path(path_without_suffix) do |path| + generate_path(path_without_suffix) do |path| FileUtils.mkdir_p File.dirname(path), mode: @dir_perm case @compress @@ -147,7 +147,7 @@ def generate_path(path_without_suffix) rescue FileAlreadyExist retry end - return path + path end class FileAlreadyExist < StandardError From ce32ac57a07121d69adf30d3b543ea11041ce613 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 7 Mar 2023 16:28:02 +0900 Subject: [PATCH 03/10] Transfer lock feature to Output class so that other plugins can use this feature. Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/out_secondary_file.rb | 38 ---------------------- lib/fluent/plugin/output.rb | 42 +++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 38 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index 08f027b8de..85d9b000e7 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -61,20 +61,12 @@ def configure(conf) @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION @file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION - @need_worker_lock = system_config.workers > 1 - @need_thread_lock = @primary_instance.buffer_config.flush_thread_count > 1 end def multi_workers_ready? true end - def start - super - extend WriteLocker - @write_mutex = Mutex.new - end - def write(chunk) path_without_suffix = extract_placeholders(@path_without_suffix, chunk) generate_path(path_without_suffix) do |path| @@ -152,35 +144,5 @@ def generate_path(path_without_suffix) class FileAlreadyExist < StandardError end - - module WriteLocker - def lock_if_need(path) - get_worker_lock_if_need(path) do - get_thread_lock_if_need do - yield - end - end - end - - def get_worker_lock_if_need(path) - unless @need_worker_lock - yield - return - end - acquire_worker_lock(path) do - yield - end - end - - def get_thread_lock_if_need - unless @need_thread_lock - yield - return - end - @write_mutex.synchronize do - yield - end - end - end end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 5dd5255652..8e821388f5 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -198,6 +198,7 @@ def rollback_count def initialize super @counter_mutex = Mutex.new + @flush_thread_mutex = Mutex.new @buffering = false @delayed_commit = false @as_secondary = false @@ -597,6 +598,47 @@ def terminate super end + def actual_flush_thread_count + return 0 unless @buffering + return @buffer_config.flush_thread_count unless @as_secondary + @primary_instance.buffer_config.flush_thread_count + end + + # Run the passed block in the appropriate lock condition for multiple threads and workers. + # The lock between workers is made for every `worker_lock_name`. + # (For multiple workers, the lock is shared if `worker_lock_name` is the same value). + # For multiple threads, `worker_lock_name` is not used, and the lock is shared by all + # threads in the same process. + def lock_if_need(worker_lock_name) + get_worker_lock_if_need(worker_lock_name) do + get_flush_thread_lock_if_need do + yield + end + end + end + + def get_worker_lock_if_need(name) + need_worker_lock = system_config.workers > 1 + unless need_worker_lock + yield + return + end + acquire_worker_lock(name) do + yield + end + end + + def get_flush_thread_lock_if_need + need_thread_lock = actual_flush_thread_count > 1 + unless need_thread_lock + yield + return + end + @flush_thread_mutex.synchronize do + yield + end + end + def support_in_v12_style?(feature) # for plugins written in v0.12 styles case feature From bebc6db22fa28934aa33d2f0b084e7bd70618578 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 7 Mar 2023 18:54:56 +0900 Subject: [PATCH 04/10] Apply suggestions: Refactor code shape Signed-off-by: Daijiro Fukuda Co-authored-by: Takuro Ashie --- lib/fluent/plugin/output.rb | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 8e821388f5..b2e5b1c94f 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -619,22 +619,18 @@ def lock_if_need(worker_lock_name) def get_worker_lock_if_need(name) need_worker_lock = system_config.workers > 1 - unless need_worker_lock - yield - return - end - acquire_worker_lock(name) do + if need_worker_lock + acquire_worker_lock(name) { yield } + else yield end end def get_flush_thread_lock_if_need need_thread_lock = actual_flush_thread_count > 1 - unless need_thread_lock - yield - return - end - @flush_thread_mutex.synchronize do + if need_thread_lock + @flush_thread_mutex.synchronize { yeild } + else yield end end From 60829a993f630a981b20599af19eb434788cd6b1 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 7 Mar 2023 19:00:15 +0900 Subject: [PATCH 05/10] Fix typo Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/output.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index b2e5b1c94f..40f827cf7d 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -629,7 +629,7 @@ def get_worker_lock_if_need(name) def get_flush_thread_lock_if_need need_thread_lock = actual_flush_thread_count > 1 if need_thread_lock - @flush_thread_mutex.synchronize { yeild } + @flush_thread_mutex.synchronize { yield } else yield end From 8f95c3497eb5228cda0d16c918b9a7998c7a700d Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 7 Mar 2023 19:02:22 +0900 Subject: [PATCH 06/10] Refactor method name to unify the existing method: `acquire_worker_lock`. Signed-off-by: Daijiro Fukuda Co-authored-by: Takuro Ashie --- lib/fluent/plugin/out_secondary_file.rb | 4 ++-- lib/fluent/plugin/output.rb | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index 85d9b000e7..1dff34256c 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -116,7 +116,7 @@ def has_time_format?(str) def generate_path(path_without_suffix) if @append path = "#{path_without_suffix}#{@suffix}" - lock_if_need(path) do + acquire_lock_if_need(path) do yield path end return path @@ -129,7 +129,7 @@ def generate_path(path_without_suffix) break unless File.exist?(path) i += 1 end - lock_if_need(path) do + acquire_lock_if_need(path) do # If multiple processes or threads select the same path and another # one entered this locking block first, the file should already # exist and this one should retry to find new path. diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 40f827cf7d..f89575b0cd 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -609,15 +609,15 @@ def actual_flush_thread_count # (For multiple workers, the lock is shared if `worker_lock_name` is the same value). # For multiple threads, `worker_lock_name` is not used, and the lock is shared by all # threads in the same process. - def lock_if_need(worker_lock_name) - get_worker_lock_if_need(worker_lock_name) do - get_flush_thread_lock_if_need do + def acquire_lock_if_need(worker_lock_name) + acquire_worker_lock_if_need(worker_lock_name) do + acquire_flush_thread_lock_if_need do yield end end end - def get_worker_lock_if_need(name) + def acquire_worker_lock_if_need(name) need_worker_lock = system_config.workers > 1 if need_worker_lock acquire_worker_lock(name) { yield } @@ -626,7 +626,7 @@ def get_worker_lock_if_need(name) end end - def get_flush_thread_lock_if_need + def acquire_flush_thread_lock_if_need need_thread_lock = actual_flush_thread_count > 1 if need_thread_lock @flush_thread_mutex.synchronize { yield } From edcbd4b6a9cf2e83c2c88e34e7a103f3a4786314 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Tue, 7 Mar 2023 19:06:00 +0900 Subject: [PATCH 07/10] Revert unrelated fix Signed-off-by: Daijiro Fukuda --- test/command/test_cat.rb | 1 - test/plugin/test_out_secondary_file.rb | 15 --------------- 2 files changed, 16 deletions(-) diff --git a/test/command/test_cat.rb b/test/command/test_cat.rb index 9e0c6e8d82..51c1a20bbb 100644 --- a/test/command/test_cat.rb +++ b/test/command/test_cat.rb @@ -83,7 +83,6 @@ def test_cat_json sub_test_case "msgpack" do def test_cat_secondary_file d = create_secondary_driver - d.instance_start path = d.instance.write(@chunk) d = create_driver d.run(expect_records: 1) do diff --git a/test/plugin/test_out_secondary_file.rb b/test/plugin/test_out_secondary_file.rb index 139e3b7b67..05091b2853 100644 --- a/test/plugin/test_out_secondary_file.rb +++ b/test/plugin/test_out_secondary_file.rb @@ -150,7 +150,6 @@ def create_chunk(primary, metadata, es) test 'should output compressed file when compress option is gzip' do d = create_driver(CONFIG, @primary) - d.instance_start path = d.instance.write(@chunk) assert_equal "#{TMP_DIR}/out_file_test.0.gz", path @@ -162,7 +161,6 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR}/ basename out_file_test ], @primary) - d.instance_start msgpack_binary = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') @@ -177,7 +175,6 @@ def create_chunk(primary, metadata, es) test 'path should be incremental when append option is false' do d = create_driver(CONFIG, @primary) - d.instance_start packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') 5.times do |i| @@ -189,7 +186,6 @@ def create_chunk(primary, metadata, es) test 'path should be unchanged when append option is true' do d = create_driver(CONFIG + %[append true], @primary) - d.instance_start packed_value = @es.to_msgpack_stream.force_encoding('ASCII-8BIT') [*1..5].each do |i| @@ -244,7 +240,6 @@ def create_chunk(primary, metadata, es) test 'normal path when compress option is gzip' do d = create_driver - d.instance_start path = d.instance.write(@c) assert_equal "#{TMP_DIR}/out_file_test.0.gz", path end @@ -254,7 +249,6 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR} basename out_file_test ] - d.instance_start path = d.instance.write(@c) assert_equal "#{TMP_DIR}/out_file_test.0", path end @@ -264,7 +258,6 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR} append true ] - d.instance_start path = d.instance.write(@c) assert_equal "#{TMP_DIR}/dump.bin", path end @@ -274,7 +267,6 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR} basename out_file_chunk_id_${chunk_id} ] - d.instance_start path = d.instance.write(@c) if File.basename(path) =~ /out_file_chunk_id_([-_.@a-zA-Z0-9].*).0/ unique_id = Fluent::UniqueId.hex(Fluent::UniqueId.generate) @@ -329,7 +321,6 @@ def create_chunk(primary, metadata, es) basename cool_${tag} compress gzip ], primary) - d.instance_start m = primary.buffer.new_metadata(tag: 'test.dummy') c = create_chunk(primary, m, @es) @@ -346,7 +337,6 @@ def create_chunk(primary, metadata, es) basename cool_${tag[0]}_${tag[1]} compress gzip ], primary) - d.instance_start m = primary.buffer.new_metadata(tag: 'test.dummy') c = create_chunk(primary, m, @es) @@ -365,7 +355,6 @@ def create_chunk(primary, metadata, es) basename cool_%Y%m%d%H compress gzip ], primary) - d.instance_start m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC")) c = create_chunk(primary, m, @es) @@ -384,7 +373,6 @@ def create_chunk(primary, metadata, es) basename cool_%Y%m%d%H compress gzip ], primary) - d.instance_start m = primary.buffer.new_metadata(timekey: event_time("2011-01-02 13:14:15 UTC")) c = create_chunk(primary, m, @es) @@ -401,7 +389,6 @@ def create_chunk(primary, metadata, es) basename cool_${test1} compress gzip ], primary) - d.instance_start m = primary.buffer.new_metadata(variables: { "test1".to_sym => "dummy" }) c = create_chunk(primary, m, @es) @@ -434,7 +421,6 @@ def create_chunk(primary, metadata, es) basename cool_%Y%m%d%H_${tag}_${test1} compress gzip ], primary) - d.instance_start m = primary.buffer.new_metadata( timekey: event_time("2011-01-02 13:14:15 UTC"), @@ -457,7 +443,6 @@ def create_chunk(primary, metadata, es) directory #{TMP_DIR}/%Y%m%d%H/${tag}/${test1} compress gzip ], primary) - d.instance_start m = primary.buffer.new_metadata( timekey: event_time("2011-01-02 13:14:15 UTC"), From f23e7e689c813a3040610e8f6b5958be3a369c28 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Wed, 8 Mar 2023 09:30:18 +0900 Subject: [PATCH 08/10] Add tests Signed-off-by: Daijiro Fukuda --- test/plugin/test_output.rb | 259 +++++++++++++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index a04b19d469..34bf439bc0 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -1072,4 +1072,263 @@ def invoke_slow_flush_log_threshold_test(i) } end end + + sub_test_case "actual_flush_thread_count" do + data( + "Not buffered", + { + output_type: :sync, + config: config_element(), + expected: 0, + } + ) + data( + "Buffered with singile thread", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {})]), + expected: 1, + } + ) + data( + "Buffered with multiple threads", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {"flush_thread_count" => 8})]), + expected: 8, + } + ) + test "actual_flush_thread_count" do |data| + o = create_output(data[:output_type]) + o.configure(data[:config]) + assert_equal data[:expected], o.actual_flush_thread_count + end + + data( + "Buffered with singile thread", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + expected: 1, + } + ) + data( + "Buffered with multiple threads", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {"flush_thread_count" => 8}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + expected: 8, + } + ) + test "actual_flush_thread_count for secondary" do |data| + primary = create_output(data[:output_type]) + primary.configure(data[:config]) + assert_equal data[:expected], primary.secondary.actual_flush_thread_count + end + end + + sub_test_case "acquire_lock_if_need" do + def setup + Dir.mktmpdir do |lock_dir| + ENV['FLUENTD_LOCK_DIR'] = lock_dir + yield + end + end + + def assert_worker_lock(lock_path, expect_locked) + # With LOCK_NB set, flock() returns: + # * `false` when the file is already locked. + # * `0` when the file is not locked. + File.open(lock_path, "w") do |f| + if expect_locked + assert_equal false, f.flock(File::LOCK_EX|File::LOCK_NB) + else + assert_equal 0, f.flock(File::LOCK_EX|File::LOCK_NB) + end + end + end + + def assert_thread_lock(output_plugin, expect_locked) + t = Thread.new do + output_plugin.acquire_lock_if_need("test") do + end + end + if expect_locked + assert_nil t.join(3) + else + assert_not_nil t.join(3) + end + end + + data( + "Not buffered with single worker", + { + output_type: :sync, + config: config_element(), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: false, + } + ) + data( + "Not buffered with multiple workers", + { + output_type: :sync, + config: config_element(), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: false, + } + ) + data( + "Buffered with single thread and single worker", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {})]), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: false, + } + ) + data( + "Buffered with multiple threads and single worker", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {"flush_thread_count" => 8})]), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: true, + } + ) + data( + "Buffered with single thread and multiple workers", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {})]), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: false, + } + ) + data( + "Buffered with multiple threads and multiple workers", + { + output_type: :full, + config: config_element("ROOT", "", {}, [config_element("buffer", "", {"flush_thread_count" => 8})]), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: true, + } + ) + test "acquire_lock_if_need" do |data| + o = create_output(data[:output_type]) + o.configure(data[:config]) + o.system_config_override(workers: data[:workers]) + + test_lock_name = "test_lock_name" + lock_path = o.get_lock_path(test_lock_name) + + o.acquire_lock_if_need(test_lock_name) do + assert_worker_lock(lock_path, data[:expect_worker_lock]) + assert_thread_lock(o, data[:expect_thread_lock]) + end + + assert_worker_lock(lock_path, false) + assert_thread_lock(o, false) + end + + data( + "Buffered with single thread and single worker", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: false, + } + ) + data( + "Buffered with multiple threads and single worker", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {"flush_thread_count" => 8}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + workers: 1, + expect_worker_lock: false, + expect_thread_lock: true, + } + ) + data( + "Buffered with single thread and multiple workers", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: false, + } + ) + data( + "Buffered with multiple threads and multiple workers", + { + output_type: :full, + config: config_element( + "ROOT", "", {}, + [ + config_element("buffer", "", {"flush_thread_count" => 8}), + config_element("secondary", "", {"@type" => "test", "name" => "test"}), + ] + ), + workers: 4, + expect_worker_lock: true, + expect_thread_lock: true, + } + ) + test "acquire_lock_if_need for secondary" do |data| + primary = create_output(data[:output_type]) + primary.configure(data[:config]) + secondary = primary.secondary + secondary.system_config_override(workers: data[:workers]) + + test_lock_name = "test_lock_name" + lock_path = secondary.get_lock_path(test_lock_name) + + secondary.acquire_lock_if_need(test_lock_name) do + assert_worker_lock(lock_path, data[:expect_worker_lock]) + assert_thread_lock(secondary, data[:expect_thread_lock]) + end + + assert_worker_lock(lock_path, false) + assert_thread_lock(secondary, false) + end + end end From 722ab6cfc0f0bdff8e778b59173a487031493931 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Wed, 8 Mar 2023 15:24:02 +0900 Subject: [PATCH 09/10] Refactor method name Signed-off-by: Daijiro Fukuda --- lib/fluent/plugin/out_secondary_file.rb | 4 ++-- lib/fluent/plugin/output.rb | 21 ++++++++++----------- test/plugin/test_output.rb | 12 ++++++------ 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/lib/fluent/plugin/out_secondary_file.rb b/lib/fluent/plugin/out_secondary_file.rb index 1dff34256c..5cadf94984 100644 --- a/lib/fluent/plugin/out_secondary_file.rb +++ b/lib/fluent/plugin/out_secondary_file.rb @@ -116,7 +116,7 @@ def has_time_format?(str) def generate_path(path_without_suffix) if @append path = "#{path_without_suffix}#{@suffix}" - acquire_lock_if_need(path) do + synchronize_path(path) do yield path end return path @@ -129,7 +129,7 @@ def generate_path(path_without_suffix) break unless File.exist?(path) i += 1 end - acquire_lock_if_need(path) do + synchronize_path(path) do # If multiple processes or threads select the same path and another # one entered this locking block first, the file should already # exist and this one should retry to find new path. diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index f89575b0cd..690033eee4 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -604,29 +604,28 @@ def actual_flush_thread_count @primary_instance.buffer_config.flush_thread_count end - # Run the passed block in the appropriate lock condition for multiple threads and workers. - # The lock between workers is made for every `worker_lock_name`. - # (For multiple workers, the lock is shared if `worker_lock_name` is the same value). - # For multiple threads, `worker_lock_name` is not used, and the lock is shared by all - # threads in the same process. - def acquire_lock_if_need(worker_lock_name) - acquire_worker_lock_if_need(worker_lock_name) do - acquire_flush_thread_lock_if_need do + # Ensures `path` (filename or filepath) processable + # only by the current thread in the current process. + # For multiple workers, the lock is shared if `path` is the same value. + # For multiple threads, the lock is shared by all threads in the same process. + def synchronize_path(path) + synchronize_path_in_workers(path) do + synchronize_in_threads do yield end end end - def acquire_worker_lock_if_need(name) + def synchronize_path_in_workers(path) need_worker_lock = system_config.workers > 1 if need_worker_lock - acquire_worker_lock(name) { yield } + acquire_worker_lock(path) { yield } else yield end end - def acquire_flush_thread_lock_if_need + def synchronize_in_threads need_thread_lock = actual_flush_thread_count > 1 if need_thread_lock @flush_thread_mutex.synchronize { yield } diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index 34bf439bc0..6bda8e3e93 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -1139,7 +1139,7 @@ def invoke_slow_flush_log_threshold_test(i) end end - sub_test_case "acquire_lock_if_need" do + sub_test_case "synchronize_path" do def setup Dir.mktmpdir do |lock_dir| ENV['FLUENTD_LOCK_DIR'] = lock_dir @@ -1162,7 +1162,7 @@ def assert_worker_lock(lock_path, expect_locked) def assert_thread_lock(output_plugin, expect_locked) t = Thread.new do - output_plugin.acquire_lock_if_need("test") do + output_plugin.synchronize_path("test") do end end if expect_locked @@ -1232,7 +1232,7 @@ def assert_thread_lock(output_plugin, expect_locked) expect_thread_lock: true, } ) - test "acquire_lock_if_need" do |data| + test "synchronize_path" do |data| o = create_output(data[:output_type]) o.configure(data[:config]) o.system_config_override(workers: data[:workers]) @@ -1240,7 +1240,7 @@ def assert_thread_lock(output_plugin, expect_locked) test_lock_name = "test_lock_name" lock_path = o.get_lock_path(test_lock_name) - o.acquire_lock_if_need(test_lock_name) do + o.synchronize_path(test_lock_name) do assert_worker_lock(lock_path, data[:expect_worker_lock]) assert_thread_lock(o, data[:expect_thread_lock]) end @@ -1313,7 +1313,7 @@ def assert_thread_lock(output_plugin, expect_locked) expect_thread_lock: true, } ) - test "acquire_lock_if_need for secondary" do |data| + test "synchronize_path for secondary" do |data| primary = create_output(data[:output_type]) primary.configure(data[:config]) secondary = primary.secondary @@ -1322,7 +1322,7 @@ def assert_thread_lock(output_plugin, expect_locked) test_lock_name = "test_lock_name" lock_path = secondary.get_lock_path(test_lock_name) - secondary.acquire_lock_if_need(test_lock_name) do + secondary.synchronize_path(test_lock_name) do assert_worker_lock(lock_path, data[:expect_worker_lock]) assert_thread_lock(secondary, data[:expect_thread_lock]) end From 70bf59396ad840e2a75feed6f5c127765dc6ad87 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Thu, 9 Mar 2023 09:42:28 +0900 Subject: [PATCH 10/10] Fix typo Signed-off-by: Daijiro Fukuda Co-authored-by: Takuro Ashie --- test/plugin/test_output.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index 6bda8e3e93..7bbd5cb5a9 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -1105,7 +1105,7 @@ def invoke_slow_flush_log_threshold_test(i) end data( - "Buffered with singile thread", + "Buffered with single thread", { output_type: :full, config: config_element(