Skip to content

Commit

Permalink
Update mio to 0.8 (#4270)
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Dec 31, 2021
1 parent 47feaa7 commit ee0e811
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 51 deletions.
19 changes: 9 additions & 10 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,19 @@ macros = ["tokio-macros"]
stats = []
net = [
"libc",
"mio/net",
"mio/os-ext",
"mio/os-poll",
"mio/os-util",
"mio/tcp",
"mio/udp",
"mio/uds",
"socket2/all",
"winapi/namedpipeapi",
]
process = [
"bytes",
"once_cell",
"libc",
"mio/net",
"mio/os-ext",
"mio/os-poll",
"mio/os-util",
"mio/uds",
"signal-hook-registry",
"winapi/threadpoollegacyapiset",
]
Expand All @@ -75,9 +74,9 @@ rt-multi-thread = [
signal = [
"once_cell",
"libc",
"mio/net",
"mio/os-ext",
"mio/os-poll",
"mio/uds",
"mio/os-util",
"signal-hook-registry",
"winapi/consoleapi",
]
Expand All @@ -94,9 +93,10 @@ pin-project-lite = "0.2.0"
bytes = { version = "1.0.0", optional = true }
once_cell = { version = "1.5.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.7.6", optional = true }
mio = { version = "0.8.0", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true }
socket2 = { version = "0.4.2", optional = true }

# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
Expand Down Expand Up @@ -128,7 +128,6 @@ proptest = "1"
rand = "0.8.0"
tempfile = "3.1.0"
async-stream = "0.3"
socket2 = "0.4"

[target.'cfg(target_os = "freebsd")'.dev-dependencies]
mio-aio = { version = "0.6.0", features = ["tokio"] }
Expand Down
64 changes: 42 additions & 22 deletions tokio/src/net/tcp/socket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::net::{TcpListener, TcpStream};

use std::convert::TryInto;
use std::fmt;
use std::io;
use std::net::SocketAddr;
Expand Down Expand Up @@ -84,7 +85,7 @@ cfg_net! {
/// [`socket2`]: https://docs.rs/socket2/
#[cfg_attr(docsrs, doc(alias = "connect_std"))]
pub struct TcpSocket {
inner: mio::net::TcpSocket,
inner: socket2::Socket,
}
}

Expand Down Expand Up @@ -119,7 +120,11 @@ impl TcpSocket {
/// }
/// ```
pub fn new_v4() -> io::Result<TcpSocket> {
let inner = mio::net::TcpSocket::new_v4()?;
let inner = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
Ok(TcpSocket { inner })
}

Expand Down Expand Up @@ -153,7 +158,11 @@ impl TcpSocket {
/// }
/// ```
pub fn new_v6() -> io::Result<TcpSocket> {
let inner = mio::net::TcpSocket::new_v6()?;
let inner = socket2::Socket::new(
socket2::Domain::IPV6,
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)?;
Ok(TcpSocket { inner })
}

Expand Down Expand Up @@ -184,7 +193,7 @@ impl TcpSocket {
/// }
/// ```
pub fn set_reuseaddr(&self, reuseaddr: bool) -> io::Result<()> {
self.inner.set_reuseaddr(reuseaddr)
self.inner.set_reuse_address(reuseaddr)
}

/// Retrieves the value set for `SO_REUSEADDR` on this socket.
Expand All @@ -210,7 +219,7 @@ impl TcpSocket {
/// }
/// ```
pub fn reuseaddr(&self) -> io::Result<bool> {
self.inner.get_reuseaddr()
self.inner.reuse_address()
}

/// Allows the socket to bind to an in-use port. Only available for unix systems
Expand Down Expand Up @@ -244,7 +253,7 @@ impl TcpSocket {
doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos"))))
)]
pub fn set_reuseport(&self, reuseport: bool) -> io::Result<()> {
self.inner.set_reuseport(reuseport)
self.inner.set_reuse_port(reuseport)
}

/// Allows the socket to bind to an in-use port. Only available for unix systems
Expand Down Expand Up @@ -279,14 +288,14 @@ impl TcpSocket {
doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos"))))
)]
pub fn reuseport(&self) -> io::Result<bool> {
self.inner.get_reuseport()
self.inner.reuse_port()
}

/// Sets the size of the TCP send buffer on this socket.
///
/// On most operating systems, this sets the `SO_SNDBUF` socket option.
pub fn set_send_buffer_size(&self, size: u32) -> io::Result<()> {
self.inner.set_send_buffer_size(size)
self.inner.set_send_buffer_size(size as usize)
}

/// Returns the size of the TCP send buffer for this socket.
Expand All @@ -313,14 +322,14 @@ impl TcpSocket {
///
/// [`set_send_buffer_size`]: #method.set_send_buffer_size
pub fn send_buffer_size(&self) -> io::Result<u32> {
self.inner.get_send_buffer_size()
self.inner.send_buffer_size().map(|n| n as u32)
}

/// Sets the size of the TCP receive buffer on this socket.
///
/// On most operating systems, this sets the `SO_RCVBUF` socket option.
pub fn set_recv_buffer_size(&self, size: u32) -> io::Result<()> {
self.inner.set_recv_buffer_size(size)
self.inner.set_recv_buffer_size(size as usize)
}

/// Returns the size of the TCP receive buffer for this socket.
Expand All @@ -347,7 +356,7 @@ impl TcpSocket {
///
/// [`set_recv_buffer_size`]: #method.set_recv_buffer_size
pub fn recv_buffer_size(&self) -> io::Result<u32> {
self.inner.get_recv_buffer_size()
self.inner.recv_buffer_size().map(|n| n as u32)
}

/// Sets the linger duration of this socket by setting the SO_LINGER option.
Expand Down Expand Up @@ -395,7 +404,9 @@ impl TcpSocket {
/// }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.get_localaddr()
self.inner
.local_addr()
.map(|addr| addr.as_socket().unwrap())
}

/// Binds the socket to the given address.
Expand Down Expand Up @@ -427,7 +438,7 @@ impl TcpSocket {
/// }
/// ```
pub fn bind(&self, addr: SocketAddr) -> io::Result<()> {
self.inner.bind(addr)
self.inner.bind(&addr.into())
}

/// Establishes a TCP connection with a peer at the specified socket address.
Expand Down Expand Up @@ -463,7 +474,13 @@ impl TcpSocket {
/// }
/// ```
pub async fn connect(self, addr: SocketAddr) -> io::Result<TcpStream> {
let mio = self.inner.connect(addr)?;
self.inner.connect(&addr.into())?;

#[cfg(windows)]
let mio = unsafe { mio::net::TcpStream::from_raw_socket(self.inner.into_raw_socket()) };
#[cfg(unix)]
let mio = unsafe { mio::net::TcpStream::from_raw_fd(self.inner.into_raw_fd()) };

TcpStream::connect_mio(mio).await
}

Expand Down Expand Up @@ -503,7 +520,14 @@ impl TcpSocket {
/// }
/// ```
pub fn listen(self, backlog: u32) -> io::Result<TcpListener> {
let mio = self.inner.listen(backlog)?;
let backlog = backlog.try_into().unwrap_or(i32::MAX);
self.inner.listen(backlog)?;

#[cfg(windows)]
let mio = unsafe { mio::net::TcpListener::from_raw_socket(self.inner.into_raw_socket()) };
#[cfg(unix)]
let mio = unsafe { mio::net::TcpListener::from_raw_fd(self.inner.into_raw_fd()) };

TcpListener::new(mio)
}

Expand All @@ -523,7 +547,7 @@ impl TcpSocket {
///
/// #[tokio::main]
/// async fn main() -> std::io::Result<()> {
///
///
/// let socket2_socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
///
/// let socket = TcpSocket::from_std_stream(socket2_socket.into());
Expand All @@ -534,16 +558,12 @@ impl TcpSocket {
pub fn from_std_stream(std_stream: std::net::TcpStream) -> TcpSocket {
#[cfg(unix)]
{
use std::os::unix::io::{FromRawFd, IntoRawFd};

let raw_fd = std_stream.into_raw_fd();
unsafe { TcpSocket::from_raw_fd(raw_fd) }
}

#[cfg(windows)]
{
use std::os::windows::io::{FromRawSocket, IntoRawSocket};

let raw_socket = std_stream.into_raw_socket();
unsafe { TcpSocket::from_raw_socket(raw_socket) }
}
Expand Down Expand Up @@ -572,7 +592,7 @@ impl FromRawFd for TcpSocket {
/// The caller is responsible for ensuring that the socket is in
/// non-blocking mode.
unsafe fn from_raw_fd(fd: RawFd) -> TcpSocket {
let inner = mio::net::TcpSocket::from_raw_fd(fd);
let inner = socket2::Socket::from_raw_fd(fd);
TcpSocket { inner }
}
}
Expand Down Expand Up @@ -607,7 +627,7 @@ impl FromRawSocket for TcpSocket {
/// The caller is responsible for ensuring that the socket is in
/// non-blocking mode.
unsafe fn from_raw_socket(socket: RawSocket) -> TcpSocket {
let inner = mio::net::TcpSocket::from_raw_socket(socket);
let inner = socket2::Socket::from_raw_socket(socket);
TcpSocket { inner }
}
}
26 changes: 7 additions & 19 deletions tokio/src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl TcpStream {
/// // if the readiness event is a false positive.
/// match stream.try_read(&mut data) {
/// Ok(n) => {
/// println!("read {} bytes", n);
/// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
Expand Down Expand Up @@ -1090,9 +1090,8 @@ impl TcpStream {
/// # }
/// ```
pub fn linger(&self) -> io::Result<Option<Duration>> {
let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());

mio_socket.get_linger()
let socket = self.to_socket();
socket.linger()
}

/// Sets the linger duration of this socket by setting the SO_LINGER option.
Expand All @@ -1117,23 +1116,12 @@ impl TcpStream {
/// # }
/// ```
pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());

mio_socket.set_linger(dur)
let socket = self.to_socket();
socket.set_linger(dur)
}

fn to_mio(&self) -> mio::net::TcpSocket {
#[cfg(windows)]
{
use std::os::windows::io::{AsRawSocket, FromRawSocket};
unsafe { mio::net::TcpSocket::from_raw_socket(self.as_raw_socket()) }
}

#[cfg(unix)]
{
use std::os::unix::io::{AsRawFd, FromRawFd};
unsafe { mio::net::TcpSocket::from_raw_fd(self.as_raw_fd()) }
}
fn to_socket(&self) -> socket2::SockRef<'_> {
socket2::SockRef::from(self)
}

/// Gets the value of the `IP_TTL` option for this socket.
Expand Down

0 comments on commit ee0e811

Please sign in to comment.