Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the failure to create UdpSocket asynchronously due to immediate polling #2157

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
101 changes: 56 additions & 45 deletions crates/proto/src/udp/udp_stream.rs
Expand Up @@ -235,8 +235,11 @@ pub(crate) struct NextRandomUdpSocket<S> {
bind_address: SocketAddr,
closure: UdpCreator<S>,
marker: PhantomData<S>,
state: Option<(usize, UdpSocketCreateFuture<S>)>,
}

type UdpSocketCreateFuture<S> = Pin<Box<dyn Send + Future<Output = io::Result<S>>>>;

impl<S: UdpSocket + 'static> NextRandomUdpSocket<S> {
/// Creates a future for randomly binding to a local socket address for client connections,
/// if no port is specified.
Expand All @@ -258,6 +261,7 @@ impl<S: UdpSocket + 'static> NextRandomUdpSocket<S> {
bind_address,
closure: Arc::new(|local_addr: _, _server_addr: _| S::bind(local_addr)),
marker: PhantomData,
state: None,
}
}
}
Expand All @@ -276,6 +280,7 @@ impl<S: DnsUdpSocket> NextRandomUdpSocket<S> {
bind_address,
closure: func,
marker: PhantomData,
state: None,
}
}
}
Expand All @@ -287,55 +292,61 @@ impl<S: DnsUdpSocket + Send> Future for NextRandomUdpSocket<S> {
/// if no port has been specified in bind_addr.
///
/// if there is no port available after 10 attempts, returns NotReady
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.bind_address.port() == 0 {
// Per RFC 6056 Section 3.2:
//
// As mentioned in Section 2.1, the dynamic ports consist of the range
// 49152-65535. However, ephemeral port selection algorithms should use
// the whole range 1024-65535.
let rand_port_range = Uniform::new_inclusive(1024_u16, u16::max_value());
let mut rand = rand::thread_rng();

for attempt in 0..10 {
let port = rand_port_range.sample(&mut rand);
let bind_addr = SocketAddr::new(self.bind_address.ip(), port);

// TODO: allow TTL to be adjusted...
// TODO: this immediate poll might be wrong in some cases...
match (*self.closure)(bind_addr, self.name_server)
.as_mut()
.poll(cx)
{
Poll::Ready(Ok(socket)) => {
debug!("created socket successfully");
return Poll::Ready(Ok(socket));
}
Poll::Ready(Err(err)) => match err.kind() {
io::ErrorKind::AddrInUse => {
debug!("unable to bind port, attempt: {}: {}", attempt, err);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut attempt = 0;
loop {
let state = self.state.take();

match state {
None => {
let fut = if self.bind_address.port() == 0 {
// Per RFC 6056 Section 3.2:
//
// As mentioned in Section 2.1, the dynamic ports consist of the range
// 49152-65535. However, ephemeral port selection algorithms should use
// the whole range 1024-65535.
let rand_port_range = Uniform::new_inclusive(1024_u16, u16::max_value());
let mut rand = rand::thread_rng();

let port = rand_port_range.sample(&mut rand);
let bind_addr = SocketAddr::new(self.bind_address.ip(), port);

// TODO: allow TTL to be adjusted...
(*self.closure)(bind_addr, self.name_server)
} else {
// Use port that was specified in bind address.
(*self.closure)(self.bind_address, self.name_server)
};

self.state = Some((attempt + 1, fut));
continue;
}
Some((s, mut fut)) => {
attempt = s;
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(socket)) => {
debug!("created socket successfully");
return Poll::Ready(Ok(socket));
}
_ => {
debug!("failed to bind port: {}", err);
return Poll::Ready(Err(err));
Poll::Ready(Err(err)) => match err.kind() {
io::ErrorKind::AddrInUse if self.bind_address.port() == 0 => {
debug!("unable to bind port, attempt: {}: {}", attempt, err);
// try next random port.
continue;
}
_ => {
debug!("failed to bind port: {}", err);
return Poll::Ready(Err(err));
}
},
Poll::Pending => {
self.state = Some((attempt, fut));
// returning NotReady here, perhaps the next poll there will be some more socket available.
return Poll::Pending;
}
},
Poll::Pending => debug!("unable to bind port, attempt: {}", attempt),
}
}
}

debug!("could not get next random port, delaying");

// TODO: because no interest is registered anywhere, we must awake.
cx.waker().wake_by_ref();

// returning NotReady here, perhaps the next poll there will be some more socket available.
Poll::Pending
} else {
// Use port that was specified in bind address.
(*self.closure)(self.bind_address, self.name_server)
.as_mut()
.poll(cx)
}
}
}
Expand Down