Skip to content

Commit

Permalink
Merge pull request #3711 from fluent/update-keepalive-timeout
Browse files Browse the repository at this point in the history
out_forward: Fix to update timeout of cached sockets
  • Loading branch information
ashie committed May 27, 2022
2 parents 367573b + cd6ade0 commit 7b95dca
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
2 changes: 2 additions & 0 deletions lib/fluent/plugin/out_forward/socket_cache.rb
Expand Up @@ -50,6 +50,7 @@ def checkout_or(key)
def checkin(sock)
@mutex.synchronize do
if (s = @inflight_sockets.delete(sock))
s.timeout = timeout
@available_sockets[s.key] << s
else
@log.debug("there is no socket #{sock}")
Expand Down Expand Up @@ -122,6 +123,7 @@ def pick_socket(key)
t = Time.now
if (s = @available_sockets[key].find { |sock| !expired_socket?(sock, time: t) })
@inflight_sockets[s.sock] = @available_sockets[key].delete(s)
s.timeout = timeout
s
else
nil
Expand Down
27 changes: 26 additions & 1 deletion test/plugin/out_forward/test_socket_cache.rb
Expand Up @@ -110,6 +110,10 @@ class SocketCacheTest < Test::Unit::TestCase
end

sub_test_case 'purge_obsolete_socks' do
def teardown
Timecop.return
end

test 'delete key in inactive_socks' do
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
sock = mock!.close { 'closed' }.subject
Expand All @@ -134,7 +138,7 @@ class SocketCacheTest < Test::Unit::TestCase
c.checkin(sock)

# wait timeout
Timecop.freeze(Time.parse('2016-04-13 14:20:00 +0900'))
Timecop.freeze(Time.parse('2016-04-13 14:00:11 +0900'))
c.checkout_or('key') { sock2 }

assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
Expand All @@ -145,5 +149,26 @@ class SocketCacheTest < Test::Unit::TestCase
assert_equal(1, c.instance_variable_get(:@inflight_sockets).size)
assert_equal(sock2, c.instance_variable_get(:@inflight_sockets).values.first.sock)
end

test 'should not purge just after checkin and purge after timeout' do
Timecop.freeze(Time.parse('2016-04-13 14:00:00 +0900'))

c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
sock = dont_allow(mock!).close
stub(sock).inspect
c.checkout_or('key') { sock }

Timecop.freeze(Time.parse('2016-04-13 14:00:11 +0900'))
c.checkin(sock)

assert_equal(1, c.instance_variable_get(:@available_sockets).size)
c.purge_obsolete_socks
assert_equal(1, c.instance_variable_get(:@available_sockets).size)

Timecop.freeze(Time.parse('2016-04-13 14:00:22 +0900'))
assert_equal(1, c.instance_variable_get(:@available_sockets).size)
c.purge_obsolete_socks
assert_equal(0, c.instance_variable_get(:@available_sockets).size)
end
end
end

0 comments on commit 7b95dca

Please sign in to comment.