Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Out forward stuck establishing connection #4396

Open
waffleisreal1339 opened this issue Feb 6, 2024 · 8 comments
Open

Out forward stuck establishing connection #4396

waffleisreal1339 opened this issue Feb 6, 2024 · 8 comments

Comments

@waffleisreal1339
Copy link

Describe the bug

I encountered a critical issue with Fluentd where it gets stuck in an infinite loop while establishing connections between out_forward and in_forward servers. Our infrastructure has unstable connection that may lead to this bug occurring on bigger frequency. This bug has led to Fluentd ceasing to send data (and heartbeat) even when the underlying network infrastructure was back online. Interestingly, killing the socket resolved this issue and cause the thread to continue sending data.

The most simple way i can describe the bug is that fluentd suddenly stops sending any data to the remote server, no errors in the logs, and if i check inside the container i can see there is a open socket on "ESTABLISHED" to the remote server.

After some digging in the code i think i might have found the reason for this behaviour.
Every heartbeat/write function, a new socket is created and then connect(dest_addr) is called, doing the TLS handshake.
After that establish_connection is called to do the hello, ping, pong protocol.
it seems that if after the TLS handshake, when we reach establish_connection, the connection drops between the fluentd and destination server, fluentd doesn't seem to detect or timeout the socket and as far as I can tell get stuck in this loop, where socket is still considered up, but will never contain any data.

   # in lib/fluent/plugin/out_forward.rb
   def establish_connection(sock, ri)
        while ri.state != :established
          begin
            # TODO: On Ruby 2.2 or earlier, read_nonblock doesn't work expectedly.
            # We need rewrite around here using new socket/server plugin helper.
            buf = sock.read_nonblock(@sender.read_length)
            if buf.empty?
              sleep @sender.read_interval
              next
            end
            Fluent::MessagePackFactory.msgpack_unpacker.feed_each(buf) do |data|
              if @handshake.invoke(sock, ri, data) == :established
                @log.debug "connection established", host: @host, port: @port
              end
            end

It's important to take note that in our infrastructure, the out_forward and in_forward servers doesn't connect directly to each other, they has a few components in-between them, so if the overall connection drops it doesn't necessarily mean that the socket will drop, so we have to rely on options like timeouts.

It would seems that socketops RCVTIMEO/SNDTIMEO should cause an exception to be raised, breaking the loop, but as far as i can tell in ruby, those socket options seems to be problematic.
https://stackoverflow.com/questions/9853516/set-socket-timeout-in-ruby-via-so-rcvtimeo-socket-option#:~:text=Based%20on%20my%20testing%2C%20and%20Jesse%20Storimer%27s%20excellent%20ebook%20on%20%22Working%20with%20TCP%20Sockets%22%20(in%20Ruby)%2C%20the%20timeout%20socket%20options%20do%20not%20work%20in%20Ruby%201.9%20(and%2C%20I%20presume%202.0%20and%202.1).%20Jesse%20says%3A
https://www.ruby-forum.com/t/how-to-set-socket-option-so-rcvtimeo/77758/8
https://stackoverflow.com/questions/57895907/ruby-2-5-set-timeout-on-tcp-sockets-read

To Reproduce

I'm not sure how to consistently reproduce the issue, it seem to depend on when the connection was dropped and where the establish_connection is currently at.

Expected behavior

I expect the socket to timeout if it failed to establish connection after some time.
Perhaps using IO.select.

Your Environment

- Fluentd version:1.16.2

Your Configuration

<match **>
          @type forward
          transport tls
          tls_allow_self_signed_cert true
          tls_cert_path /opt/bitnami/fluentd/certs/tls/fluentd.crt
          require_ack_response true
          <server>
            host ****
            port 24224
          </server>
          <buffer>
            @type file
            path *****
            total_limit_size 4096MB
            chunk_limit_size 16MB
            flush_mode interval
            flush_interval 5s
            flush_thread_count 8
            retry_type periodic
            retry_wait 10s
            retry_forever true
            queued_chunks_limit_size 8
          </buffer>
        </match>

Your Error Log

The logs don't contain to much information, it mostly stops printing any logs.
For the time being i also cannot supply them because of PPI information, ill sanitise, and attach them once i can.
Also I turned trace level logging on my instances for later occurrences of the bug.

Additional context

No response

@daipom
Copy link
Contributor

daipom commented Feb 20, 2024

After some digging in the code i think i might have found the reason for this behaviour.
Every heartbeat/write function, a new socket is created and then connect(dest_addr) is called, doing the TLS handshake.
After that establish_connection is called to do the hello, ping, pong protocol.
it seems that if after the TLS handshake, when we reach establish_connection, the connection drops between the fluentd and destination server, fluentd doesn't seem to detect or timeout the socket and as far as I can tell get stuck in this loop, where socket is still considered up, but will never contain any data.

I expect the socket to timeout if it failed to establish connection after some time.
Perhaps using IO.select.

Thanks for your report. We may need to do some digging.
We need a simple way to reproduce the problem.

@Gtharan10
Copy link

Gtharan10 commented Mar 24, 2024

We're encountering a similar issue while distributing 80,000 events per second to two separate systems. Have there been any updates or workarounds identified for this?

@daipom
Copy link
Contributor

daipom commented Mar 25, 2024

We're encountering a similar issue while distributing 80,000 events per second to two separate systems. Have there been any updates or workarounds identified for this?

No.
I have not made time to look into this issue in detail.

I'll see if I can reproduce it.
I'd be glad to receive any information that could help us reproduce the issue.

@Gtharan10
Copy link

Yes...

With the below configurations for forwarder and aggregator:

Both of these are in a Multi Process Workers environment, with 4 workers on each node.

<match udp.input.**>
    @type forward
    require_ack_response true
    heartbeat_type udp
    <buffer>
        @type memory
        flush_interval 1s
        flush_thread_count 10
        chunk_limit_size 50m
        queue_limit_length 500
        chunk_limit_size 100m
        overflow_action drop_oldest_chunk
        retry_max_interval 10m
        retry_forever true
        delayed_commit_timeout 100
    </buffer>
   <server>
     host 10.10.1.2
     port 24224
     weight 60
   </server>
   <server>
     host 10.10.1.3
     port 24224
     weight 60
   </server>
</match>

<source>
    @type forward
    port 24224
    bind 0.0.0.0
    tag udp.forward
</source>

It appears that when attempting to distribute events, Node 1 receives events via UDP and then shares them with other nodes using the forwarder plugin. However, after approximately 10 seconds, fluentd enters a stale mode where it no longer accepts new incoming events and only forwards heartbeats. And I haven't seen any error logs stating the above behaviour.

After checking the network calls with strace we are suspecting this

[pid 54679] recvfrom(13, 0x7f70df800000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70df800000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70df800000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70df800000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70df800000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70df800000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70df800000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70df800000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70df800000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70df800000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70df800000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, "{\"reportType\": \"sslsession\", \"so"..., 10485760, 0, {sa_family=AF_INET, sin_port=htons(38897), sin_addr=inet_addr("10.10.1.5")}, [2048 => 16]) = 59168
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54676] recvfrom(10, "\201\243ack\271YUgPCpFf8KyAOu0reJa0wg==\n", 512, 0, 0x7f70e6efd790, [2048 => 0]) = 31
[pid 54676] shutdown(10, SHUT_WR)       = 0
[pid 54669] socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_TCP) = 10
[pid 54669] connect(10, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, 16) = -1 EINPROGRESS (Operation now in progress)
[pid 54669] getsockopt(10, SOL_SOCKET, SO_ERROR, [0], [4]) = 0
[pid 54669] getsockopt(10, SOL_SOCKET, SO_ERROR, [0], [4]) = 0
[pid 54669] getsockname(10, {sa_family=AF_INET, sin_port=htons(51956), sin_addr=inet_addr("10.10.1.1")}, [2048 => 16]) = 0
[pid 54669] setsockopt(10, SOL_SOCKET, SO_LINGER, {l_onoff=1, l_linger=60}, 8) = 0
[pid 54669] setsockopt(10, SOL_SOCKET, SO_RCVTIMEO_OLD, "\276\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 16) = 0
[pid 54669] setsockopt(10, SOL_SOCKET, SO_SNDTIMEO_OLD, "<\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 16) = 0

<PROCESS STALLS HERE FOR A WHILE, DURING WHICH ONLY PERIODIC HEARTBEAT SIGNALS ARE SENT TO THE DEVICE>

[pid 54675] sendto(9, "\0", 1, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.2")}, 16) = 1
[pid 54675] sendto(9, "\0", 1, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, 16) = 1
[pid 54675] recvfrom(9, "\0", 512, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, [2048 => 16]) = 1
[pid 54675] recvfrom(9, "\0", 512, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.2")}, [2048 => 16]) = 1
[pid 54676] recvfrom(10, "\201\243ack\271YUgPDAI3vNb9cuJQbyVasg==\n", 512, 0, 0x7f70e6efd790, [2048 => 0]) = 31
[pid 54676] shutdown(10, SHUT_WR)       = 0
[pid 54675] sendto(9, "\0", 1, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.2")}, 16) = 1
[pid 54675] sendto(9, "\0", 1, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, 16) = 1
[pid 54675] recvfrom(9, "\0", 512, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.2")}, [2048 => 16]) = 1
[pid 54675] recvfrom(9, "\0", 512, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, [2048 => 16]) = 1
[pid 54675] sendto(9, "\0", 1, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.2")}, 16) = 1
[pid 54675] sendto(9, "\0", 1, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, 16) = 1
[pid 54675] recvfrom(9, "\0", 512, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.2")}, [2048 => 16]) = 1
[pid 54675] recvfrom(9, "\0", 512, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, [2048 => 16]) = 1

<SKIPPING A FEW BEATS TO SHORTEN THE LOG>

[pid 54675] sendto(9, "\0", 1, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.2")}, 16) = 1
[pid 54675] sendto(9, "\0", 1, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, 16) = 1
[pid 54675] recvfrom(9, "\0", 512, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.2")}, [2048 => 16]) = 1
[pid 54675] recvfrom(9, "\0", 512, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, [2048 => 16]) = 1
[pid 54675] sendto(9, "\0", 1, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.2")}, 16) = 1
[pid 54675] sendto(9, "\0", 1, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, 16) = 1
[pid 54675] recvfrom(9, "\0", 512, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.2")}, [2048 => 16]) = 1
[pid 54675] recvfrom(9, "\0", 512, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, [2048 => 16]) = 1
[pid 54675] sendto(9, "\0", 1, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.2")}, 16) = 1
[pid 54675] sendto(9, "\0", 1, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, 16) = 1
[pid 54675] recvfrom(9, "\0", 512, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.2")}, [2048 => 16]) = 1
[pid 54675] recvfrom(9, "\0", 512, 0, {sa_family=AF_INET, sin_port=htons(24224), sin_addr=inet_addr("10.10.1.3")}, [2048 => 16]) = 1

<AGAIN PROCESS RESUMES HERE>

[pid 54679] recvfrom(13, 0x7f70dec00000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, "{\"reportType\": \"sslsession\", \"so"..., 10485760, 0, {sa_family=AF_INET, sin_port=htons(42461), sin_addr=inet_addr("10.10.1.5")}, [2048 => 16]) = 58968
[pid 54679] recvfrom(13, 0x7f70dd000000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, "{\"reportType\": \"sslsession\", \"so"..., 10485760, 0, {sa_family=AF_INET, sin_port=htons(60693), sin_addr=inet_addr("10.10.1.5")}, [2048 => 16]) = 59165
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e0400000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, "{\"reportType\": \"sslsession\", \"so"..., 10485760, 0, {sa_family=AF_INET, sin_port=htons(60693), sin_addr=inet_addr("10.10.1.5")}, [2048 => 16]) = 59165
[pid 54679] recvfrom(13, 0x7f70e1000000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e1000000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e1000000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e1000000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e1000000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e1000000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e1000000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e1000000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e1000000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e1000000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, 0x7f70e1000000, 10485760, 0, 0x7f70e5afdaf0, [2048]) = -1 EAGAIN (Resource temporarily unavailable)
[pid 54679] recvfrom(13, "{\"reportType\": \"sslsession\", \"so"..., 10485760, 0, {sa_family=AF_INET, sin_port=htons(46832), sin_addr=inet_addr("10.10.1.5")}, [2048 => 16]) = 59071

If there's any other information needed on this issue, please let me know.

@Gtharan10
Copy link

Gtharan10 commented Apr 17, 2024

did you manage to replicate the issue @daipom ?

@daipom
Copy link
Contributor

daipom commented Apr 17, 2024

Sorry, I haven't made time for this.
Thanks for your information!

With the below configurations for forwarder and aggregator:

Does this mean that this issue reproduces within the same machine (setting in_forward and out_forward in the same config)?
If so, I think I could try to reproduce it.

Or, does this reproduce only under the following infrastructure (where in_forward and out_forward don't connect directly)?

It's important to take note that in our infrastructure, the out_forward and in_forward servers doesn't connect directly to each other, they has a few components in-between them, so if the overall connection drops it doesn't necessarily mean that the socket will drop, so we have to rely on options like timeouts.

@Gtharan10
Copy link

Gtharan10 commented Apr 17, 2024

This setup is currently deployed in an Esxi host. We have three nodes deployed independently, and we have been able to reproduce the issue by continuously streaming across the nodes.

The actual flow will be like this
UDP -> Log Forwarder -> Forward aggregator -> Opensearch

I have three questions:

  1. If the forwarder gets stuck in the middle, why does the UDP plugin also stop receiving data? Only heartbeats are sent to the other nodes.
  2. Are there any other steps to produce dumps for the Ruby code to check what is happening in real-time?
  3. Are there any limitations regarding the amount of data that can be continuously transmitted by the forwarder?

@lkwiatek-sc
Copy link

I'm observing the same issue in our setup on MacOS, it usually occurs when the macbook lid is closed. Based on the logs I noticed that macbook wakes up from time to time to do certain things and during these wake-ups fluentd is more likely to enter the infinite loop in the establish_connection.

I'm using TLS and heartbeat disabled.

I've added some extra error logging to the rescue IO::WaitReadable => e and during normal operation the rescue statement triggers from time to time (usually no more than 10 retries):

2024-04-22 16:31:31 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=2 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-22 16:31:31 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=4 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-22 16:31:31 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=5 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-22 16:31:33 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=1 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-22 16:31:33 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=2 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-22 16:31:33 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=4 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-22 16:31:33 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=5 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-22 16:31:33 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=6 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"

When it enter the infinite loop the error stays the same, but the retry count increase forever:

2024-04-23 09:47:03 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=30437 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-23 09:47:03 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=30438 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-23 09:47:03 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=30439 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-23 09:47:03 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=30440 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-23 09:47:03 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=30441 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-23 09:47:03 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=30442 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-23 09:47:03 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=30443 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"
2024-04-23 09:47:03 +0200 [warn]: #0 IO::WaitReadable host="..." port=12345 retry_count=30444 error_class=OpenSSL::SSL::SSLErrorWaitReadable error="read would block"

My current workaround probably doesn't address the actual issue (why read_nonblock infinitely returns retriable error), but is good enough to prevent infinite loop:

612a613
>         retry_count = 0
627c628,636
<           rescue IO::WaitReadable
---
>           rescue IO::WaitReadable => e
>             # On MacOS under certain circumstances read_nonblock will infinitely raise OpenSSL::SSL::SSLErrorWaitReadable
>             # (error="read would block"). During normal operation retry_count usually does not exceed 10, thus we set the
>             # limit to 25 to be on the safe side.
>             if retry_count > 25
>               @log.warn "retry count over 25 times", host: @host, port: @port, "last error": e
>               disable!
>               break
>             end
630a640
>             retry_count += 1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: To-Do
Development

No branches or pull requests

4 participants