Skip to content

Commit

Permalink
net: allow customized I/O operations for TcpStream (#3888)
Browse files Browse the repository at this point in the history
  • Loading branch information
zonyitoo committed Jul 8, 2021
1 parent c6fbb9a commit c306bf8
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 0 deletions.
35 changes: 35 additions & 0 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -936,6 +936,41 @@ impl TcpStream {
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
}

/// Try to perform IO operation from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The
/// closure should attempt to perform IO operation from the socket by manually calling the
/// appropriate syscall. If the operation fails because the socket is not
/// actually ready, then the closure should return a `WouldBlock` error and
/// the readiness flag is cleared. The return value of the closure is
/// then returned by `try_io`.
///
/// If the socket is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the read operation using any of the
/// methods defined on the Tokio `TcpStream` type, as this will mess with
/// the readiness flag and can cause the socket to behave incorrectly.
///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: TcpStream::readable()
/// [`writable()`]: TcpStream::writable()
/// [`ready()`]: TcpStream::ready()
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}

/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
Expand Down
35 changes: 35 additions & 0 deletions tokio/src/net/udp.rs
Expand Up @@ -1170,6 +1170,41 @@ impl UdpSocket {
.try_io(Interest::READABLE, || self.io.recv_from(buf))
}

/// Try to perform IO operation from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The
/// closure should attempt to perform IO operation from the socket by manually calling the
/// appropriate syscall. If the operation fails because the socket is not
/// actually ready, then the closure should return a `WouldBlock` error and
/// the readiness flag is cleared. The return value of the closure is
/// then returned by `try_io`.
///
/// If the socket is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the read operation using any of the
/// methods defined on the Tokio `UdpSocket` type, as this will mess with
/// the readiness flag and can cause the socket to behave incorrectly.
///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: UdpSocket::readable()
/// [`writable()`]: UdpSocket::writable()
/// [`ready()`]: UdpSocket::ready()
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}

/// Receives data from the socket, without removing it from the input queue.
/// On success, returns the number of bytes read and the address from whence
/// the data came.
Expand Down
35 changes: 35 additions & 0 deletions tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -1143,6 +1143,41 @@ impl UnixDatagram {
Ok((n, SocketAddr(addr)))
}

/// Try to perform IO operation from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The
/// closure should attempt to perform IO operation from the socket by manually calling the
/// appropriate syscall. If the operation fails because the socket is not
/// actually ready, then the closure should return a `WouldBlock` error and
/// the readiness flag is cleared. The return value of the closure is
/// then returned by `try_io`.
///
/// If the socket is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the read operation using any of the
/// methods defined on the Tokio `UnixDatagram` type, as this will mess with
/// the readiness flag and can cause the socket to behave incorrectly.
///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: UnixDatagram::readable()
/// [`writable()`]: UnixDatagram::writable()
/// [`ready()`]: UnixDatagram::ready()
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}

/// Returns the local address that this socket is bound to.
///
/// # Examples
Expand Down
35 changes: 35 additions & 0 deletions tokio/src/net/unix/stream.rs
Expand Up @@ -653,6 +653,41 @@ impl UnixStream {
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}

/// Try to perform IO operation from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The
/// closure should attempt to perform IO operation from the socket by manually calling the
/// appropriate syscall. If the operation fails because the socket is not
/// actually ready, then the closure should return a `WouldBlock` error and
/// the readiness flag is cleared. The return value of the closure is
/// then returned by `try_io`.
///
/// If the socket is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the read operation using any of the
/// methods defined on the Tokio `UnixStream` type, as this will mess with
/// the readiness flag and can cause the socket to behave incorrectly.
///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: UnixStream::readable()
/// [`writable()`]: UnixStream::writable()
/// [`ready()`]: UnixStream::ready()
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}

/// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
///
/// This function is intended to be used to wrap a UnixStream from the
Expand Down
70 changes: 70 additions & 0 deletions tokio/src/net/windows/named_pipe.rs
Expand Up @@ -723,6 +723,41 @@ impl NamedPipeServer {
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}

/// Try to perform IO operation from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The
/// closure should attempt to perform IO operation from the socket by manually calling the
/// appropriate syscall. If the operation fails because the socket is not
/// actually ready, then the closure should return a `WouldBlock` error and
/// the readiness flag is cleared. The return value of the closure is
/// then returned by `try_io`.
///
/// If the socket is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the read operation using any of the
/// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
/// the readiness flag and can cause the socket to behave incorrectly.
///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: NamedPipeServer::readable()
/// [`writable()`]: NamedPipeServer::writable()
/// [`ready()`]: NamedPipeServer::ready()
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}
}

impl AsyncRead for NamedPipeServer {
Expand Down Expand Up @@ -1343,6 +1378,41 @@ impl NamedPipeClient {
.registration()
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}

/// Try to perform IO operation from the socket using a user-provided IO operation.
///
/// If the socket is ready, the provided closure is called. The
/// closure should attempt to perform IO operation from the socket by manually calling the
/// appropriate syscall. If the operation fails because the socket is not
/// actually ready, then the closure should return a `WouldBlock` error and
/// the readiness flag is cleared. The return value of the closure is
/// then returned by `try_io`.
///
/// If the socket is not ready, then the closure is not called
/// and a `WouldBlock` error is returned.
///
/// The closure should only return a `WouldBlock` error if it has performed
/// an IO operation on the socket that failed due to the socket not being
/// ready. Returning a `WouldBlock` error in any other situation will
/// incorrectly clear the readiness flag, which can cause the socket to
/// behave incorrectly.
///
/// The closure should not perform the read operation using any of the
/// methods defined on the Tokio `NamedPipeClient` type, as this will mess with
/// the readiness flag and can cause the socket to behave incorrectly.
///
/// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
///
/// [`readable()`]: NamedPipeClient::readable()
/// [`writable()`]: NamedPipeClient::writable()
/// [`ready()`]: NamedPipeClient::ready()
pub fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
self.io.registration().try_io(interest, f)
}
}

impl AsyncRead for NamedPipeClient {
Expand Down

2 comments on commit c306bf8

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'sync_mpsc'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: c306bf8 Previous: c6fbb9a Ratio
send_large 66725 ns/iter (± 8778) 28018 ns/iter (± 3241) 2.38

This comment was automatically generated by workflow using github-action-benchmark.

CC: @tokio-rs/maintainers

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'sync_mpsc'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: c306bf8 Previous: c6fbb9a Ratio
send_large 66171 ns/iter (± 9461) 28018 ns/iter (± 3241) 2.36

This comment was automatically generated by workflow using github-action-benchmark.

CC: @tokio-rs/maintainers

Please sign in to comment.