diff --git a/lib/fluent/error.rb b/lib/fluent/error.rb index 564b1c8d63..251094c820 100644 --- a/lib/fluent/error.rb +++ b/lib/fluent/error.rb @@ -28,6 +28,9 @@ def to_s class InvalidRootDirectory < UnrecoverableError end + class InvalidLockDirectory < UnrecoverableError + end + # For internal use class UncatchableError < Exception end diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index 601e216b81..1846273a17 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -31,6 +31,7 @@ class Base def initialize @log = nil super + @fluentd_lock_dir = ENV['FLUENTD_LOCK_DIR'] @_state = State.new(false, false, false, false, false, false, false, false, false) @_context_router = nil @_fluentd_worker_id = nil @@ -70,6 +71,21 @@ def multi_workers_ready? true end + def get_lock_path(name) + name = name.gsub(/[^a-zA-Z0-9]/, "_") + File.join(@fluentd_lock_dir, "fluentd-#{name}.lock") + end + + def acquire_worker_lock(name) + if @fluentd_lock_dir.nil? + raise InvalidLockDirectory, "can't acquire lock because FLUENTD_LOCKDIR isn't set" + end + File.open(get_lock_path(name), "w") do |f| + f.flock(File::LOCK_EX) + yield + end + end + def string_safe_encoding(str) unless str.valid_encoding? str = str.scrub('?') diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index f09c149eb1..7bf25a1b6e 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -188,6 +188,10 @@ def configure(conf) condition = Gem::Dependency.new('', [">= 2.7.0", "< 3.1.0"]) @need_ruby_on_macos_workaround = true if condition.match?('', RUBY_VERSION) end + + if @need_lock && @append && fluentd_lockdir.nil? + raise InvalidLockDirectory, "must set FLUENTD_LOCKDIR on multi-worker append mode" + end end def multi_workers_ready? @@ -217,7 +221,13 @@ def write(chunk) end if @append - writer.call(path, chunk) + if @need_lock + acquire_worker_lock(path) do + writer.call(path, chunk) + end + else + writer.call(path, chunk) + end else find_filepath_available(path, with_lock: @need_lock) do |actual_path| writer.call(actual_path, chunk) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 0bd82ece47..854e53cb12 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -875,7 +875,11 @@ def supervise se = ServerEngine.create(ServerModule, WorkerModule){ Fluent::Supervisor.load_config(@config_path, params) } - se.run + + Dir.mktmpdir("fluentd-lock-") do |lock_dir| + ENV['FLUENTD_LOCK_DIR'] = lock_dir + se.run + end end def install_main_process_signal_handlers diff --git a/test/plugin/test_base.rb b/test/plugin/test_base.rb index 51d55c1b69..b9567c9228 100644 --- a/test/plugin/test_base.rb +++ b/test/plugin/test_base.rb @@ -1,4 +1,5 @@ require_relative '../helper' +require 'tmpdir' require 'fluent/plugin/base' module FluentPluginBaseTest @@ -112,4 +113,37 @@ class FluentPluginBaseTest::DummyPlugin2 < Fluent::Plugin::TestBase assert_equal 1, logger.logs.size assert{ logger.logs.first.include?("invalid byte sequence is replaced in ") } end + + test 'generates worker lock path safely' do + Dir.mktmpdir("test-fluentd-lock-") do |lock_dir| + ENV['FLUENTD_LOCK_DIR'] = lock_dir + p = FluentPluginBaseTest::DummyPlugin.new + path = p.get_lock_path("Aa\\|=~/_123"); + + assert_equal lock_dir, File.dirname(path) + assert_equal "fluentd-Aa______123.lock", File.basename(path) + end + end + + test 'can acquire inter-worker locking' do + Dir.mktmpdir("test-fluentd-lock-") do |lock_dir| + ENV['FLUENTD_LOCK_DIR'] = lock_dir + p = FluentPluginBaseTest::DummyPlugin.new + lock_path = p.get_lock_path("test_base") + + p.acquire_worker_lock("test_base") do + # With LOCK_NB set, flock() returns `false` when the + # file is already locked. + File.open(lock_path, "w") do |f| + assert_equal false, f.flock(File::LOCK_EX|File::LOCK_NB) + end + end + + # Lock should be release by now. In that case, flock + # must return 0. + File.open(lock_path, "w") do |f| + assert_equal 0, f.flock(File::LOCK_EX|File::LOCK_NB) + end + end + end end