From 6a28230563eafb511e63c2ce739ff496f9878b2a Mon Sep 17 00:00:00 2001 From: cssivision Date: Sat, 26 Dec 2020 21:14:29 +0800 Subject: [PATCH] net: Add try_read_buf and try_recv_buf --- tokio/src/io/poll_evented.rs | 32 +++++++++++++++++++++++++++ tokio/src/net/tcp/stream.rs | 10 +++++++++ tokio/src/net/udp.rs | 10 +++++++++ tokio/src/net/unix/datagram/socket.rs | 10 +++++++++ tokio/src/net/unix/stream.rs | 10 +++++++++ 5 files changed, 72 insertions(+) diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 0ecdb18066f..df7d493b1c1 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -5,6 +5,8 @@ use std::fmt; use std::io; use std::ops::Deref; +use bytes::BufMut; + cfg_io_driver! { /// Associates an I/O resource that implements the [`std::io::Read`] and/or /// [`std::io::Write`] traits with the reactor that drives it. @@ -164,6 +166,36 @@ feature! { Poll::Ready(Ok(())) } + // Safety: The caller must ensure that `E` can read into uninitialized memory + pub(crate) unsafe fn read_buf<'a, B: BufMut>( + &'a self, + buf: &'a mut B, + ) -> io::Result + where + &'a E: io::Read + 'a, + { + use std::io::Read; + + if !buf.has_remaining_mut() { + return Ok(0); + } + + let dst = buf.chunk_mut(); + let dst = &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit]); + let mut buf = ReadBuf::uninit(dst); + + let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]); + let n = self.io.as_ref().unwrap().read(b)?; + + // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the + // buffer. + buf.assume_init(n); + + buf.advance(n); + + Ok(buf.filled().len()) + } + pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> where &'a E: io::Write + 'a, diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index d4bfba4606a..debb8934584 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -17,6 +17,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; +use bytes::BufMut; + cfg_net! { /// A TCP stream between a local and a remote socket. /// @@ -559,6 +561,14 @@ impl TcpStream { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } + /// Try to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + pub fn try_read_buf(&self, buf: &mut B) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || unsafe { self.io.read_buf(buf) }) + } + /// Wait for the socket to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually diff --git a/tokio/src/net/udp.rs b/tokio/src/net/udp.rs index 23abe98e457..da8bd470467 100644 --- a/tokio/src/net/udp.rs +++ b/tokio/src/net/udp.rs @@ -7,6 +7,8 @@ use std::io; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::task::{Context, Poll}; +use bytes::BufMut; + cfg_net! { /// A UDP socket /// @@ -683,6 +685,14 @@ impl UdpSocket { .try_io(Interest::READABLE, || self.io.recv(buf)) } + /// Try to receive data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + pub fn try_recv_buf(&self, buf: &mut B) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || unsafe { self.io.read_buf(buf) }) + } + /// Sends data on the socket to the given address. On success, returns the /// number of bytes written. /// diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index fb5f602959d..dc381b67462 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -10,6 +10,8 @@ use std::os::unix::net; use std::path::Path; use std::task::{Context, Poll}; +use bytes::BufMut; + cfg_net_unix! { /// An I/O object representing a Unix datagram socket. /// @@ -652,6 +654,14 @@ impl UnixDatagram { .try_io(Interest::READABLE, || self.io.recv(buf)) } + /// Try to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + pub fn try_recv_buf(&self, buf: &mut B) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || unsafe { self.io.read_buf(buf) }) + } + /// Sends data on the socket to the specified address. /// /// # Examples diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index dc929dc21ea..94cb6adf1ca 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -15,6 +15,8 @@ use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; +use bytes::BufMut; + cfg_net_unix! { /// A structure representing a connected Unix socket. /// @@ -267,6 +269,14 @@ impl UnixStream { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } + /// Try to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + pub fn try_read_buf(&self, buf: &mut B) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || unsafe { self.io.read_buf(buf) }) + } + /// Wait for the socket to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually