Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition of out_secondary_file #4081

Merged
merged 10 commits into from Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 39 additions & 22 deletions lib/fluent/plugin/out_secondary_file.rb
Expand Up @@ -64,31 +64,29 @@ def configure(conf)
end

def multi_workers_ready?
### TODO: add hack to synchronize for multi workers
true
end

def write(chunk)
path_without_suffix = extract_placeholders(@path_without_suffix, chunk)
path = generate_path(path_without_suffix)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
generate_path(path_without_suffix) do |path|
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
end
end

path
end

private
Expand Down Expand Up @@ -117,15 +115,34 @@ def has_time_format?(str)

def generate_path(path_without_suffix)
if @append
"#{path_without_suffix}#{@suffix}"
else
path = "#{path_without_suffix}#{@suffix}"
synchronize_path(path) do
yield path
end
return path
end

begin
i = 0
loop do
path = "#{path_without_suffix}.#{i}#{@suffix}"
return path unless File.exist?(path)
break unless File.exist?(path)
i += 1
end
synchronize_path(path) do
# If multiple processes or threads select the same path and another
# one entered this locking block first, the file should already
# exist and this one should retry to find new path.
raise FileAlreadyExist if File.exist?(path)
yield path
end
rescue FileAlreadyExist
retry
end
path
end

class FileAlreadyExist < StandardError
end
end
end
37 changes: 37 additions & 0 deletions lib/fluent/plugin/output.rb
Expand Up @@ -198,6 +198,7 @@ def rollback_count
def initialize
super
@counter_mutex = Mutex.new
@flush_thread_mutex = Mutex.new
@buffering = false
@delayed_commit = false
@as_secondary = false
Expand Down Expand Up @@ -597,6 +598,42 @@ def terminate
super
end

def actual_flush_thread_count
return 0 unless @buffering
return @buffer_config.flush_thread_count unless @as_secondary
@primary_instance.buffer_config.flush_thread_count
end

# Ensures `path` (filename or filepath) processable
# only by the current thread in the current process.
# For multiple workers, the lock is shared if `path` is the same value.
# For multiple threads, the lock is shared by all threads in the same process.
def synchronize_path(path)
synchronize_path_in_workers(path) do
synchronize_in_threads do
yield
end
end
end

def synchronize_path_in_workers(path)
need_worker_lock = system_config.workers > 1
if need_worker_lock
acquire_worker_lock(path) { yield }
else
yield
end
end

def synchronize_in_threads
need_thread_lock = actual_flush_thread_count > 1
if need_thread_lock
@flush_thread_mutex.synchronize { yield }
else
yield
end
end

def support_in_v12_style?(feature)
# for plugins written in v0.12 styles
case feature
Expand Down