diff --git a/.github/workflows/windows-test.yaml b/.github/workflows/windows-test.yaml index 1f43f18e57..12a7537c57 100644 --- a/.github/workflows/windows-test.yaml +++ b/.github/workflows/windows-test.yaml @@ -13,11 +13,14 @@ jobs: strategy: fail-fast: false matrix: - ruby-version: ['2.7', '2.6'] + ruby-version: ['3.1', '2.7'] os: - windows-latest experimental: [false] include: + - ruby-version: head + os: windows-latest + experimental: true - ruby-version: '3.0.3' os: windows-latest experimental: false diff --git a/lib/fluent/plugin/file_wrapper.rb b/lib/fluent/plugin/file_wrapper.rb index 57d6b4949d..f05af89205 100644 --- a/lib/fluent/plugin/file_wrapper.rb +++ b/lib/fluent/plugin/file_wrapper.rb @@ -16,8 +16,35 @@ module Fluent module FileWrapper - def self.open(*args) - io = WindowsFile.new(*args).io + include File::Constants + + def self.mode2flags(mode) + # Always need BINARY to enable SHARE_DELETE + # https://bugs.ruby-lang.org/issues/11218 + # https://github.com/ruby/ruby/blob/d6684f063bc53e3cab025bd39526eca3b480b5e7/win32/win32.c#L6332-L6345 + flags = BINARY | SHARE_DELETE + case mode.delete("b") + when "r" + flags |= RDONLY + when "r+" + flags |= RDWR + when "w" + flags |= WRONLY | CREAT | TRUNC + when "w+" + flags |= RDWR | CREAT | TRUNC + when "a" + flags |= WRONLY | CREAT | APPEND + when "a+" + flags |= RDWR | CREAT | APPEND + else + raise Errno::EINVAL.new("Unsupported mode by Fluent::FileWrapper: #{mode}") + end + end + + def self.open(path, mode='r') + # inject File::Constants::SHARE_DELETE + # https://github.com/fluent/fluentd/pull/3585#issuecomment-1101502617 + io = File.open(path, mode2flags(mode)) if block_given? v = yield io io.close @@ -35,17 +62,6 @@ def self.stat(path) end end - module WindowsFileExtension - attr_reader :path - - def stat - s = super - s.instance_variable_set :@ino, @ino - def s.ino; @ino; end - s - end - end - class Win32Error < StandardError require 'windows/error' include Windows::Error @@ -88,7 +104,9 @@ def wsaerr? end end - # To open and get stat with setting FILE_SHARE_DELETE + # To open and get stat with setting FILE_SHARE_DELETE. + # Although recent Ruby's File.stat uses it, we still need this to keep + # backward compatibility of ino and delete_pending methods. class WindowsFile require 'windows/file' require 'windows/error' @@ -135,16 +153,6 @@ def close @file_handle = INVALID_HANDLE_VALUE end - def io - fd = _open_osfhandle(@file_handle, 0) - raise Errno::ENOENT if fd == -1 - io = File.for_fd(fd, @mode) - io.instance_variable_set :@ino, self.ino - io.instance_variable_set :@path, @path - io.extend WindowsFileExtension - io - end - def ino by_handle_file_information = '\0'*(4+8+8+8+4+4+4+4+4+4) #72bytes diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 53aca518c2..66a2776cc5 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -431,10 +431,12 @@ def try_write(chunk) ] stub(d.instance.ack_handler).read_ack_from_sock(anything).never - target_input_driver.run(expect_records: 2) do - d.run do - emit_events.each do |tag, t, record| - d.feed(tag, t, record) + assert_rr do + target_input_driver.run(expect_records: 2) do + d.run do + emit_events.each do |tag, t, record| + d.feed(tag, t, record) + end end end end @@ -461,10 +463,12 @@ def try_write(chunk) ] stub(d.instance.ack_handler).read_ack_from_sock(anything).never - target_input_driver.run(expect_records: 2) do - d.run(default_tag: 'test') do - records.each do |record| - d.feed(time, record) + assert_rr do + target_input_driver.run(expect_records: 2) do + d.run(default_tag: 'test') do + records.each do |record| + d.feed(time, record) + end end end end @@ -491,10 +495,12 @@ def try_write(chunk) {"a" => 2} ] stub(d.instance.ack_handler).read_ack_from_sock(anything).never - target_input_driver.run(expect_records: 2) do - d.run(default_tag: 'test') do - records.each do |record| - d.feed(time, record) + assert_rr do + target_input_driver.run(expect_records: 2) do + d.run(default_tag: 'test') do + records.each do |record| + d.feed(time, record) + end end end end @@ -549,10 +555,12 @@ def try_write(chunk) ] # not attempt to receive responses stub(d.instance.ack_handler).read_ack_from_sock(anything).never - target_input_driver.run(expect_records: 2) do - d.run(default_tag: 'test') do - records.each do |record| - d.feed(time, record) + assert_rr do + target_input_driver.run(expect_records: 2) do + d.run(default_tag: 'test') do + records.each do |record| + d.feed(time, record) + end end end end @@ -575,10 +583,12 @@ def try_write(chunk) ] # not attempt to receive responses stub(d.instance.ack_handler).read_ack_from_sock(anything).never - target_input_driver.run(expect_records: 2) do - d.run(default_tag: 'test') do - records.each do |record| - d.feed(time, record) + assert_rr do + target_input_driver.run(expect_records: 2) do + d.run(default_tag: 'test') do + records.each do |record| + d.feed(time, record) + end end end end