Skip to content

Commit

Permalink
net: Add try_read_buf and try_recv_buf
Browse files Browse the repository at this point in the history
  • Loading branch information
cssivision committed Dec 26, 2020
1 parent 2330edc commit 6a28230
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 0 deletions.
32 changes: 32 additions & 0 deletions tokio/src/io/poll_evented.rs
Expand Up @@ -5,6 +5,8 @@ use std::fmt;
use std::io;
use std::ops::Deref;

use bytes::BufMut;

cfg_io_driver! {
/// Associates an I/O resource that implements the [`std::io::Read`] and/or
/// [`std::io::Write`] traits with the reactor that drives it.
Expand Down Expand Up @@ -164,6 +166,36 @@ feature! {
Poll::Ready(Ok(()))
}

// Safety: The caller must ensure that `E` can read into uninitialized memory
pub(crate) unsafe fn read_buf<'a, B: BufMut>(
&'a self,
buf: &'a mut B,
) -> io::Result<usize>
where
&'a E: io::Read + 'a,
{
use std::io::Read;

if !buf.has_remaining_mut() {
return Ok(0);
}

let dst = buf.chunk_mut();
let dst = &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>]);
let mut buf = ReadBuf::uninit(dst);

let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
let n = self.io.as_ref().unwrap().read(b)?;

// Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
// buffer.
buf.assume_init(n);

buf.advance(n);

Ok(buf.filled().len())
}

pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
where
&'a E: io::Write + 'a,
Expand Down
10 changes: 10 additions & 0 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -17,6 +17,8 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use bytes::BufMut;

cfg_net! {
/// A TCP stream between a local and a remote socket.
///
Expand Down Expand Up @@ -559,6 +561,14 @@ impl TcpStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}

/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || unsafe { self.io.read_buf(buf) })
}

/// Wait for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
Expand Down
10 changes: 10 additions & 0 deletions tokio/src/net/udp.rs
Expand Up @@ -7,6 +7,8 @@ use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::task::{Context, Poll};

use bytes::BufMut;

cfg_net! {
/// A UDP socket
///
Expand Down Expand Up @@ -683,6 +685,14 @@ impl UdpSocket {
.try_io(Interest::READABLE, || self.io.recv(buf))
}

/// Try to receive data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || unsafe { self.io.read_buf(buf) })
}

/// Sends data on the socket to the given address. On success, returns the
/// number of bytes written.
///
Expand Down
10 changes: 10 additions & 0 deletions tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -10,6 +10,8 @@ use std::os::unix::net;
use std::path::Path;
use std::task::{Context, Poll};

use bytes::BufMut;

cfg_net_unix! {
/// An I/O object representing a Unix datagram socket.
///
Expand Down Expand Up @@ -652,6 +654,14 @@ impl UnixDatagram {
.try_io(Interest::READABLE, || self.io.recv(buf))
}

/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || unsafe { self.io.read_buf(buf) })
}

/// Sends data on the socket to the specified address.
///
/// # Examples
Expand Down
10 changes: 10 additions & 0 deletions tokio/src/net/unix/stream.rs
Expand Up @@ -15,6 +15,8 @@ use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::BufMut;

cfg_net_unix! {
/// A structure representing a connected Unix socket.
///
Expand Down Expand Up @@ -267,6 +269,14 @@ impl UnixStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}

/// Try to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
self.io
.registration()
.try_io(Interest::READABLE, || unsafe { self.io.read_buf(buf) })
}

/// Wait for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
Expand Down

0 comments on commit 6a28230

Please sign in to comment.