Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"unable to enqueue message" when AsyncClient<UdpResponse> sends too many requests #1276

Closed
LEXUGE opened this issue Nov 8, 2020 · 36 comments · Fixed by #1356
Closed

"unable to enqueue message" when AsyncClient<UdpResponse> sends too many requests #1276

LEXUGE opened this issue Nov 8, 2020 · 36 comments · Fixed by #1356

Comments

@LEXUGE
Copy link
Contributor

LEXUGE commented Nov 8, 2020

Describe the bug
During pressure tests for scale like 890 qps, I found

2020-11-08 18:24:11,428 DEBUG [trust_dns_proto::xfer] enqueueing message: [Query { name: Name { is_fqdn: true, labels: [baidu, com] }, query_type: A, query_class: IN }]
2020-11-08 18:24:11,428 DEBUG [trust_dns_proto::xfer] unable to enqueue message
2020-11-08 18:24:11,428 WARN  [droute::router] Upstream encountered error: could not send request, returning SERVFAIL
2020-11-08 18:24:11,428 DEBUG [trust_dns_proto::xfer::dns_exchange] io_stream is done, shutting down

where droute is the name of my project.

To Reproduce
Hard to get down to a minimal reproducible code snippet. However, I did the following:

  1. Receive DNS query in an event loop. Spawn a new task for each query.
  2. For each task, clone the AsyncClient and send through it.

There is only one AsyncClient, but it is cloned for several times.
I tested with delay in between, 1 millisecond doesn't help (issue persists for unable to enqueue), 2 milliseconds result in timeout (I set timeout for like 2 seconds per query).
I also tested to have multiple AsyncClient, which results in high rate of timeout.

Code related can be found here

Expected behavior
No error

System:

  • OS: [e.g. macOS]
  • Architecture: [e.g. x86_64]
  • Version [e.g. 22]
  • rustc version: [e.g. 1.28]

Version:
Crate: client
Version: 0.19.5

Additional context
I also used tokio-compat, but it doesn't seem to occur the issue.

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Nov 8, 2020

It might be caused by CHANNEL_BUFFER_SIZE. Can we have an option to have unbounded mpsc channel?

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Nov 9, 2020

I found out this is not related to buffer size, but I used async Mutex across send which causes the problem. I don't know why holding async Mutex across causes the problem. But it is solved.

@LEXUGE LEXUGE closed this as completed Nov 9, 2020
@djc
Copy link
Collaborator

djc commented Nov 9, 2020

Can you explain a bit more about the changes that fixed the problem you were seeing? It sounds like there might be a potential deadlock in trust-dns that you triggered?

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Nov 9, 2020

Sure, here is the problematic code. I have roughly an async Mutex<HashMap<usize, AsyncClient<UdpResponse>, and I locked it across the send().await operation. After a while of running free of error, it starts to time out, and finally gets me unable to enqueue: couldn't find receivers or receiver is gone (I don't remember exactly).

Either block the async Mutex before and after the send operation or using sync Mutex helps me mitigate the issue. In conclusion, I can't hold MutexGuard<AsyncClient> across send().await, else it causes the issue.


Also, unbounded or bounded mpsc is not related to this issue as I tried to use unbounded channel, getting the same issue. In addition to that, both 0.19.5 and 0.20-alpha3 get me the issue with same error: receiver is gone.

HashMap<usize, AsyncClient> is like a correspondence between the configurations user specified and the clients. It's only 5 or 4 keys.

@djc
Copy link
Collaborator

djc commented Nov 9, 2020

Why are you holding on to so many separate AsyncClient instances in the first place?

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Nov 9, 2020

No, I am not. I created a HashMap to "cache" AsyncClient for different configuration. I cloned the same AsyncClient for each incoming query to reduce the time needed for creating an extra client.

@LEXUGE LEXUGE reopened this Nov 12, 2020
@LEXUGE
Copy link
Contributor Author

LEXUGE commented Nov 29, 2020

I tested the same code on with Tokio 0.3 on main branch, seems like it is able to enqueue messages, but it has to wait for a long time. Can we change the channel to be unlimited? Or have an option for that

@djc
Copy link
Collaborator

djc commented Nov 29, 2020

Effectively this is the AsyncClient applying backpressure towards your application, because the internal queue isn't being emptied fast enough. I think the solution here is (a) investigating what the performance problem is on the other side of the queue, or (b) keep a buffer in your application. We could make the channel larger, but in the limit that would just mean that your queries start to time out.

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Nov 29, 2020

Does having multiple AsyncClient help? I suppose each client has its own queue (background)?

@djc
Copy link
Collaborator

djc commented Nov 29, 2020

If you set up multiple async clients separately that should mitigate the problem, yes. But again, that just means now you're potentially opening multiple connections to the same backend servers, which might cause your client to get rate-limited sooner.

It would be useful to know what the bottleneck in draining the queue is, in your application.

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Nov 29, 2020

Thanks, I would investigate further to see where exact point is.

@djc
Copy link
Collaborator

djc commented Nov 29, 2020

dcompass looks like a cool project, BTW!

Is there a particular reason you're using trust-dns-client here rather than trust-dns-resolver?

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Nov 29, 2020

To tell you the truth, the first prototype used resolver because I didn't understand how the client works. However, that means I have to refill the IP response back to create a new packet and send back, which is cumbersome. Using client means I only need to forward query and send back the answers, which is kind of more native.

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Nov 29, 2020

Seems like it's a network issue only. It now can top 3000 qps!

@djc
Copy link
Collaborator

djc commented Nov 30, 2020

So this can be closed again, right?

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Nov 30, 2020

So this can be closed again, right?

I think so. However, regarding the queue size, I hope it could be increased or exposed with an argument.

@djc
Copy link
Collaborator

djc commented Nov 30, 2020

I don't really see a good reason to do that: if there's a network delay for example, it's better for your application to become aware of that sooner rather than later (through timeouts).

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Jan 14, 2021

Update:
Although backpressure is expected, but on 0.20 the underlying channel seems always full after once it's filled. i.e. it's irrecoverable. I suspect this is a bug.

@djc
Copy link
Collaborator

djc commented Jan 14, 2021

Would be nice to have a minimal reproduction that demonstrates the issue.

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Jan 14, 2021

use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use trust_dns_client::{client::AsyncClient, udp::UdpClientStream};
use trust_dns_proto::op::Message;
use trust_dns_proto::xfer::dns_handle::DnsHandle;

#[tokio::main]
async fn main() {
    // Bind an UDP socket
    let socket = Arc::new(
        UdpSocket::bind("127.0.0.1:2053".parse::<SocketAddr>().unwrap())
            .await
            .unwrap(),
    );

    let client = {
        let stream = UdpClientStream::<UdpSocket>::new("8.8.8.8:53".parse().unwrap());
        let (client, bg) = AsyncClient::connect(stream).await.unwrap();
        tokio::spawn(bg);
        client
    };

    // Event loop
    loop {
        let mut buf = [0; 1232];

        let (_, src) = socket.recv_from(&mut buf).await.unwrap();

        let msg = Message::from_vec(&buf).unwrap();
        let socket = socket.clone();

        let mut client = client.clone();
        tokio::spawn(async move {
            let id = msg.id();
            let mut r = Message::from(client.send(msg).await.unwrap());
            r.set_id(id);
            socket.send_to(&r.to_vec().unwrap(), src).await
        });
    }
}

I believe this is the minimal sample. However, due to having a "good" network environment now, I cannot really test it out. (It works under current environment).

@djc
Copy link
Collaborator

djc commented Jan 14, 2021

I don't think it would be a bug if that fails. The application needs to be able to "handle" backpressure from the underlying library, by slowing down the rate of requests if necessary. The channel always being full just means the receiver cannot handle the rate of requests faster than the sender is trying to send it. If that happens with your application, you should find some way for your application to "handle" the backpressure, for example by temporarily delaying further requests or load balancing with another channel.

Maybe do some reading on backpressure if you're not familiar with it:

https://medium.com/@jayphelps/backpressure-explained-the-flow-of-data-through-software-2350b3e77ce7

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Jan 14, 2021

However, the situation here is that the channel is full even though the sender stopped sending any message for a considerable amount of time. That is what I unexpected. I think by either cancelling or dropping or handling internally in trust-dns, it should be able to empty the channel somehow.

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Jan 14, 2021

image
For example, after I stressed the requestor out, it is full even after 1 minute as I tested for www.example.cn. (I tested for longer intervals as well).

But I cannot reproduce on that minimal sample still. Maybe this issue is related to my codebase, however, I don't see any substantial difference between the sample and my codebase.

@djc
Copy link
Collaborator

djc commented Jan 15, 2021

Okay, that does sound look a bug. Without some way to reproduce it or a more detailed problem report, I'm not sure I have any avenues for fixing it, though.

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Jan 16, 2021

image
image
I tweaked the logging a little and found this bizarre situation.
This might be related to my codebase. I am not sure if I misused the UdpClientStream. What I did is to create a client for once and clone it for later usages.

impl Udp {
    pub async fn new(addr: SocketAddr) -> Result<Self> {
        let stream = UdpClientStream::<UdpSocket>::new(addr);
        let (client, bg) = AsyncClient::connect(stream).await?;
        tokio::spawn(bg);
        Ok(Self { client })
    }
}

#[async_trait]
impl ClientPool for Udp {
    async fn get_client(&self) -> Result<AsyncClient> {
        Ok(self.client.clone())
    }
}

It's weird to see receiver being dropped though.

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Jan 16, 2021

Seems like before the receiver was gone, there was always some error like failed to associate send_message response to the sender. This may cause background and receiver to drop, resulting in this issue.

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Jan 17, 2021

The root cause of this issue is that the background task encountered some error and exited, while the client tried to send the message but failed due to non-existed background. What I did is to create a new AsyncClient whenever it fails.

@LEXUGE LEXUGE closed this as completed Jan 17, 2021
@djc
Copy link
Collaborator

djc commented Jan 17, 2021

Did you figure out what error the background task encountered? We should maybe make the background task more resilient to failure if the error is at all recoverable.

@djc djc reopened this Jan 17, 2021
@LEXUGE
Copy link
Contributor Author

LEXUGE commented Jan 17, 2021

Mainly failed to associate send_message response to the sender.
For HTTPS clients, there may also be io_stream hit an error, shutting down: not an error (approximately as underlying streams get closenotify).

@bluejekyll
Copy link
Member

Thanks for all the research on this. I'm wondering if the issue here is that we are hitting a network error, but the request future waiting for the result isn't able to be notified, b/c there is no ID associated back to the stream (thus forcing the timeout to expire before resolving itself).

Do we need a better method of binding the request id and IO Stream associated together, such that when the IO Stream fails we can immediately return a result to the channel waiting for a response? (I'm guessing this might be the issue based on @LEXUGE 's research).

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Jan 18, 2021

There are two cases separately.
One is that the background encountered some PERMANENT errors, in that case AsyncClient should make the caller aware of the situation or destruct itself maybe, since a client without background makes no sense.
Another case is that the AsyncClient irresponsibly being dropped. In that way, I expect the background exit by itself (as it does currently).

However, if the error is transient (like a network issue, if the background is able to tell), I expect the background to survive through those errors.

@djc
Copy link
Collaborator

djc commented Jan 18, 2021

The behavior in the case of failing to associate seems wrong; see if #1356 improves the situation?

For the other one, is that the exact message? I cannot find where the "not an error" phrase would have come from.

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Jan 18, 2021

The behavior in the case of failing to associate seems wrong; see if #1356 improves the situation?

For the other one, is that the exact message? I cannot find where the "not an error" phrase would have come from.

It is not exact. The exact message (for DoH clients) is io_stream hit an error, shutting down: h2 stream errored: protocol error: not a result of an error. This is actually not an error I suppose as this is part of the action taken by h2 for CloseNotify.

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Jan 18, 2021

And for failing to associate, that PR doesn't seem to be the right behavior either as I pointed out. Currently, if there are multiple AsyncClients and a single DnsExchangeBackground, and one of the AsyncClient sent the query but went away, then the DnsExchangeBackground dies out. However, with that PR, even if all the clients are dropped, the background still carries on, which is not that right though.

Can we figure out how to let the background exit if and only if all clients are dropped or some irrecoverable errors encountered (better to let all the clients know in that case).

@djc
Copy link
Collaborator

djc commented Jan 18, 2021

That's not how I understand the change I made. It seems to me that the change only lets the task go on if sending a response to a receiver fails. However, the DnsExchangeBackground will also check for the outbound_messages queue of requests, and if that goes empty outbound_messages.as_mut().poll_next(cx) = Poll::Ready(None) the underlying stream will shutdown, and the DnsExchangeBackground will return Poll::Ready(Ok(())) once shutdown is complete. This should take care of the scenario you describe.

@LEXUGE
Copy link
Contributor Author

LEXUGE commented Jan 18, 2021

Thanks, that seems right to me now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants