Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition of out_secondary_file #4081

Merged
merged 10 commits into from Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 39 additions & 22 deletions lib/fluent/plugin/out_secondary_file.rb
Expand Up @@ -64,31 +64,29 @@ def configure(conf)
end

def multi_workers_ready?
### TODO: add hack to synchronize for multi workers
true
end

def write(chunk)
path_without_suffix = extract_placeholders(@path_without_suffix, chunk)
path = generate_path(path_without_suffix)
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
generate_path(path_without_suffix) do |path|
FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

case @compress
when :text
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
chunk.write_to(f)
}
when :gzip
File.open(path, "ab", @file_perm) {|f|
f.flock(File::LOCK_EX)
gz = Zlib::GzipWriter.new(f)
chunk.write_to(gz)
gz.close
}
end
end

path
end

private
Expand Down Expand Up @@ -117,15 +115,34 @@ def has_time_format?(str)

def generate_path(path_without_suffix)
if @append
"#{path_without_suffix}#{@suffix}"
else
path = "#{path_without_suffix}#{@suffix}"
acquire_lock_if_need(path) do
yield path
end
return path
end

begin
i = 0
loop do
path = "#{path_without_suffix}.#{i}#{@suffix}"
return path unless File.exist?(path)
break unless File.exist?(path)
i += 1
end
acquire_lock_if_need(path) do
# If multiple processes or threads select the same path and another
# one entered this locking block first, the file should already
# exist and this one should retry to find new path.
raise FileAlreadyExist if File.exist?(path)
yield path
end
rescue FileAlreadyExist
retry
end
path
end

class FileAlreadyExist < StandardError
end
end
end
38 changes: 38 additions & 0 deletions lib/fluent/plugin/output.rb
Expand Up @@ -198,6 +198,7 @@ def rollback_count
def initialize
super
@counter_mutex = Mutex.new
@flush_thread_mutex = Mutex.new
@buffering = false
@delayed_commit = false
@as_secondary = false
Expand Down Expand Up @@ -597,6 +598,43 @@ def terminate
super
end

def actual_flush_thread_count
return 0 unless @buffering
return @buffer_config.flush_thread_count unless @as_secondary
@primary_instance.buffer_config.flush_thread_count
end

# Run the passed block in the appropriate lock condition for multiple threads and workers.
# The lock between workers is made for every `worker_lock_name`.
# (For multiple workers, the lock is shared if `worker_lock_name` is the same value).
# For multiple threads, `worker_lock_name` is not used, and the lock is shared by all
# threads in the same process.
def acquire_lock_if_need(worker_lock_name)
daipom marked this conversation as resolved.
Show resolved Hide resolved
acquire_worker_lock_if_need(worker_lock_name) do
acquire_flush_thread_lock_if_need do
yield
end
end
end

def acquire_worker_lock_if_need(name)
need_worker_lock = system_config.workers > 1
if need_worker_lock
acquire_worker_lock(name) { yield }
else
yield
end
end

def acquire_flush_thread_lock_if_need
need_thread_lock = actual_flush_thread_count > 1
if need_thread_lock
@flush_thread_mutex.synchronize { yield }
else
yield
end
end

def support_in_v12_style?(feature)
# for plugins written in v0.12 styles
case feature
Expand Down