Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: allow customized I/O operations for TcpStream (#3873) #3888

Merged
merged 11 commits into from Jul 8, 2021
35 changes: 35 additions & 0 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -936,6 +936,41 @@ impl TcpStream {
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
}

/// Try to perform IO operation from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The
/// closure should attempt to perform IO operation from the socket by manually calling the
/// appropriate syscall. If the operation fails because the socket is not
/// actually ready, then the closure should return a `WouldBlock` error and
/// the readiness flag is cleared. The return value of the closure is
/// then returned by `try_io`.
///
/// If the socket is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the read operation using any of the
/// methods defined on the Tokio `TcpStream` type, as this will mess with
/// the readiness flag and can cause the socket to behave incorrectly.
///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: TcpStream::readable()
/// [`writable()`]: TcpStream::writable()
/// [`ready()`]: TcpStream::ready()
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}

/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
Expand Down
35 changes: 35 additions & 0 deletions tokio/src/net/udp.rs
Expand Up @@ -1170,6 +1170,41 @@ impl UdpSocket {
.try_io(Interest::READABLE, || self.io.recv_from(buf))
}

/// Try to perform IO operation from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The
/// closure should attempt to perform IO operation from the socket by manually calling the
/// appropriate syscall. If the operation fails because the socket is not
/// actually ready, then the closure should return a `WouldBlock` error and
/// the readiness flag is cleared. The return value of the closure is
/// then returned by `try_io`.
///
/// If the socket is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the read operation using any of the
/// methods defined on the Tokio `UdpSocket` type, as this will mess with
/// the readiness flag and can cause the socket to behave incorrectly.
///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: UdpSocket::readable()
/// [`writable()`]: UdpSocket::writable()
/// [`ready()`]: UdpSocket::ready()
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}

/// Receives data from the socket, without removing it from the input queue.
/// On success, returns the number of bytes read and the address from whence
/// the data came.
Expand Down
35 changes: 35 additions & 0 deletions tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -1143,6 +1143,41 @@ impl UnixDatagram {
Ok((n, SocketAddr(addr)))
}

/// Try to perform IO operation from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The
/// closure should attempt to perform IO operation from the socket by manually calling the
/// appropriate syscall. If the operation fails because the socket is not
/// actually ready, then the closure should return a `WouldBlock` error and
/// the readiness flag is cleared. The return value of the closure is
/// then returned by `try_io`.
///
/// If the socket is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the read operation using any of the
/// methods defined on the Tokio `UnixDatagram` type, as this will mess with
/// the readiness flag and can cause the socket to behave incorrectly.
///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: UnixDatagram::readable()
/// [`writable()`]: UnixDatagram::writable()
/// [`ready()`]: UnixDatagram::ready()
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}

/// Returns the local address that this socket is bound to.
///
/// # Examples
Expand Down
35 changes: 35 additions & 0 deletions tokio/src/net/unix/stream.rs
Expand Up @@ -653,6 +653,41 @@ impl UnixStream {
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}

/// Try to perform IO operation from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The
/// closure should attempt to perform IO operation from the socket by manually calling the
/// appropriate syscall. If the operation fails because the socket is not
/// actually ready, then the closure should return a `WouldBlock` error and
/// the readiness flag is cleared. The return value of the closure is
/// then returned by `try_io`.
///
/// If the socket is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the read operation using any of the
/// methods defined on the Tokio `UnixStream` type, as this will mess with
/// the readiness flag and can cause the socket to behave incorrectly.
///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: UnixStream::readable()
/// [`writable()`]: UnixStream::writable()
/// [`ready()`]: UnixStream::ready()
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}

/// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
///
/// This function is intended to be used to wrap a UnixStream from the
Expand Down
70 changes: 70 additions & 0 deletions tokio/src/net/windows/named_pipe.rs
Expand Up @@ -723,6 +723,41 @@ impl NamedPipeServer {
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}

/// Try to perform IO operation from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The
/// closure should attempt to perform IO operation from the socket by manually calling the
/// appropriate syscall. If the operation fails because the socket is not
/// actually ready, then the closure should return a `WouldBlock` error and
/// the readiness flag is cleared. The return value of the closure is
/// then returned by `try_io`.
///
/// If the socket is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the read operation using any of the
/// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
/// the readiness flag and can cause the socket to behave incorrectly.
///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: NamedPipeServer::readable()
/// [`writable()`]: NamedPipeServer::writable()
/// [`ready()`]: NamedPipeServer::ready()
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}
}

impl AsyncRead for NamedPipeServer {
Expand Down Expand Up @@ -1343,6 +1378,41 @@ impl NamedPipeClient {
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}

/// Try to perform IO operation from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The
/// closure should attempt to perform IO operation from the socket by manually calling the
/// appropriate syscall. If the operation fails because the socket is not
/// actually ready, then the closure should return a `WouldBlock` error and
/// the readiness flag is cleared. The return value of the closure is
/// then returned by `try_io`.
///
/// If the socket is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the read operation using any of the
/// methods defined on the Tokio `NamedPipeClient` type, as this will mess with
/// the readiness flag and can cause the socket to behave incorrectly.
///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: NamedPipeClient::readable()
/// [`writable()`]: NamedPipeClient::writable()
/// [`ready()`]: NamedPipeClient::ready()
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}
}

impl AsyncRead for NamedPipeClient {
Expand Down