Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for concurrent append in out_file #3808

Closed
wants to merge 10 commits into from
19 changes: 19 additions & 0 deletions lib/fluent/plugin/base.rb
Expand Up @@ -51,6 +51,12 @@ def fluentd_worker_id
@_fluentd_worker_id
end

def fluentd_lockdir
return @_fluentd_lockdir if @_fluentd_lockdir
@_fluentd_lockdir = ENV['FLUENTD_LOCKDIR']
@_fluentd_lockdir
fujimotos marked this conversation as resolved.
Show resolved Hide resolved
end

def configure(conf)
if Fluent::Engine.supervisor_mode || (conf.respond_to?(:for_this_worker?) && conf.for_this_worker?)
workers = if conf.target_worker_ids && !conf.target_worker_ids.empty?
Expand All @@ -70,6 +76,19 @@ def multi_workers_ready?
true
end

def acquire_worker_lock(name, &block)
fujimotos marked this conversation as resolved.
Show resolved Hide resolved
if fluentd_lockdir.nil?
raise RuntimeError, "fail to create lockfile on '#{fluentd_lockdir}'"
fujimotos marked this conversation as resolved.
Show resolved Hide resolved
end

name = name.gsub(/[^a-zA-Z0-9]/, "_")
lockfile = "fluentd-#{name}.lock"
File.open(File.join(fluentd_lockdir, lockfile), "w") do |f|
f.flock(File::LOCK_EX)
block.call()
fujimotos marked this conversation as resolved.
Show resolved Hide resolved
end
end

def string_safe_encoding(str)
unless str.valid_encoding?
str = str.scrub('?')
Expand Down
8 changes: 7 additions & 1 deletion lib/fluent/plugin/out_file.rb
Expand Up @@ -217,7 +217,13 @@ def write(chunk)
end

if @append
writer.call(path, chunk)
if @need_lock
acquire_worker_lock(path) do
writer.call(path, chunk)
end
else
writer.call(path, chunk)
end
else
find_filepath_available(path, with_lock: @need_lock) do |actual_path|
writer.call(actual_path, chunk)
Expand Down
9 changes: 9 additions & 0 deletions lib/fluent/supervisor.rb
Expand Up @@ -50,6 +50,9 @@ def before_run
@rpc_server = nil
@counter = nil

@fluentd_lockdir = Dir.mktmpdir("fluentd-lock-")
fujimotos marked this conversation as resolved.
Show resolved Hide resolved
ENV['FLUENTD_LOCKDIR'] = @fluentd_lockdir

if config[:rpc_endpoint]
@rpc_endpoint = config[:rpc_endpoint]
@enable_get_dump = config[:enable_get_dump]
Expand Down Expand Up @@ -79,9 +82,15 @@ def after_run
stop_windows_event_thread if Fluent.windows?
stop_rpc_server if @rpc_endpoint
stop_counter_server if @counter
cleanup_lockdir
Fluent::Supervisor.cleanup_resources
end

def cleanup_lockdir
FileUtils.rm(Dir.glob(File.join(@fluentd_lockdir, "fluentd-*.lock")))
FileUtils.rmdir(@fluentd_lockdir)
fujimotos marked this conversation as resolved.
Show resolved Hide resolved
end

def run_rpc_server
@rpc_server = RPC::Server.new(@rpc_endpoint, $log)

Expand Down