Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for concurrent append in out_file #3808

Closed
wants to merge 10 commits into from
3 changes: 3 additions & 0 deletions lib/fluent/error.rb
Expand Up @@ -28,6 +28,9 @@ def to_s
class InvalidRootDirectory < UnrecoverableError
end

class InvalidLockDirectory < UnrecoverableError
end

# For internal use
class UncatchableError < Exception
end
Expand Down
16 changes: 16 additions & 0 deletions lib/fluent/plugin/base.rb
Expand Up @@ -31,6 +31,7 @@ class Base
def initialize
@log = nil
super
@fluentd_lock_dir = ENV['FLUENTD_LOCK_DIR']
@_state = State.new(false, false, false, false, false, false, false, false, false)
@_context_router = nil
@_fluentd_worker_id = nil
Expand Down Expand Up @@ -70,6 +71,21 @@ def multi_workers_ready?
true
end

def get_lock_path(name)
name = name.gsub(/[^a-zA-Z0-9]/, "_")
File.join(@fluentd_lock_dir, "fluentd-#{name}.lock")
end

def acquire_worker_lock(name)
if @fluentd_lock_dir.nil?
raise InvalidLockDirectory, "can't acquire lock because FLUENTD_LOCKDIR isn't set"
end
File.open(get_lock_path(name), "w") do |f|
f.flock(File::LOCK_EX)
yield
end
end

def string_safe_encoding(str)
unless str.valid_encoding?
str = str.scrub('?')
Expand Down
12 changes: 11 additions & 1 deletion lib/fluent/plugin/out_file.rb
Expand Up @@ -188,6 +188,10 @@ def configure(conf)
condition = Gem::Dependency.new('', [">= 2.7.0", "< 3.1.0"])
@need_ruby_on_macos_workaround = true if condition.match?('', RUBY_VERSION)
end

if @need_lock && @append && fluentd_lockdir.nil?
raise InvalidLockDirectory, "must set FLUENTD_LOCKDIR on multi-worker append mode"
end
end

def multi_workers_ready?
Expand Down Expand Up @@ -217,7 +221,13 @@ def write(chunk)
end

if @append
writer.call(path, chunk)
if @need_lock
acquire_worker_lock(path) do
writer.call(path, chunk)
end
else
writer.call(path, chunk)
end
else
find_filepath_available(path, with_lock: @need_lock) do |actual_path|
writer.call(actual_path, chunk)
Expand Down
6 changes: 5 additions & 1 deletion lib/fluent/supervisor.rb
Expand Up @@ -875,7 +875,11 @@ def supervise
se = ServerEngine.create(ServerModule, WorkerModule){
Fluent::Supervisor.load_config(@config_path, params)
}
se.run

Dir.mktmpdir("fluentd-lock-") do |lock_dir|
ENV['FLUENTD_LOCK_DIR'] = lock_dir
se.run
end
end

def install_main_process_signal_handlers
Expand Down
34 changes: 34 additions & 0 deletions test/plugin/test_base.rb
@@ -1,4 +1,5 @@
require_relative '../helper'
require 'tmpdir'
require 'fluent/plugin/base'

module FluentPluginBaseTest
Expand Down Expand Up @@ -112,4 +113,37 @@ class FluentPluginBaseTest::DummyPlugin2 < Fluent::Plugin::TestBase
assert_equal 1, logger.logs.size
assert{ logger.logs.first.include?("invalid byte sequence is replaced in ") }
end

test 'generates worker lock path safely' do
Dir.mktmpdir("test-fluentd-lock-") do |lock_dir|
ENV['FLUENTD_LOCK_DIR'] = lock_dir
p = FluentPluginBaseTest::DummyPlugin.new
path = p.get_lock_path("Aa\\|=~/_123");

assert_equal lock_dir, File.dirname(path)
assert_equal "fluentd-Aa______123.lock", File.basename(path)
end
end

test 'can acquire inter-worker locking' do
Dir.mktmpdir("test-fluentd-lock-") do |lock_dir|
ENV['FLUENTD_LOCK_DIR'] = lock_dir
p = FluentPluginBaseTest::DummyPlugin.new
lock_path = p.get_lock_path("test_base")

p.acquire_worker_lock("test_base") do
# With LOCK_NB set, flock() returns `false` when the
# file is already locked.
File.open(lock_path, "w") do |f|
assert_equal false, f.flock(File::LOCK_EX|File::LOCK_NB)
end
end

# Lock should be release by now. In that case, flock
# must return 0.
File.open(lock_path, "w") do |f|
assert_equal 0, f.flock(File::LOCK_EX|File::LOCK_NB)
end
end
end
end