From a46dd5047e03007e68cad69f5fee4462672fee0d Mon Sep 17 00:00:00 2001 From: zonyitoo Date: Sat, 26 Jun 2021 00:43:54 +0800 Subject: [PATCH] clearing write readiness if TFO connect returns EINPROGRESS - ref #555 - imperfect until tokio-rs/tokio#3888 was merged --- Cargo.lock | 14 +-- Cargo.toml | 3 + .../src/net/sys/unix/bsd/freebsd.rs | 71 ++++++----- .../shadowsocks/src/net/sys/unix/bsd/macos.rs | 57 ++++++--- .../shadowsocks/src/net/sys/unix/linux/mod.rs | 116 +++++++++++------- crates/shadowsocks/src/net/sys/windows/mod.rs | 81 ++++++------ 6 files changed, 201 insertions(+), 141 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9f171838d75..bca8c8dc4c5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -531,9 +531,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.18" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" dependencies = [ "libc", ] @@ -1015,9 +1015,9 @@ checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" [[package]] name = "openssl-sys" -version = "0.9.64" +version = "0.9.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "209efc2fe0e980c8849efacdb567f975a1c80245c4f6980d6f012733bfa851af" +checksum = "7a7907e3bfa08bb85105209cdfcb6c63d109f8f6c1ed6ca318fff5c1853fbc1d" dependencies = [ "autocfg", "cc", @@ -1814,8 +1814,7 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" version = "1.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fb2ed024293bb19f7a5dc54fe83bf86532a44c12a2bb8ba40d64a4509395ca2" +source = "git+https://github.com/zonyitoo/tokio.git#f051b2cae1eede47f3ef1b0b53097104f05097e9" dependencies = [ "autocfg", "bytes", @@ -1844,8 +1843,7 @@ dependencies = [ [[package]] name = "tokio-macros" version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c49e3df43841dafb86046472506755d8501c5615673955f6aa17181125d13c37" +source = "git+https://github.com/zonyitoo/tokio.git#f051b2cae1eede47f3ef1b0b53097104f05097e9" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 6dcfe64c690c..f119462531f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -141,3 +141,6 @@ byteorder = "1.3" env_logger = "0.8" byte_string = "1.0" tokio = { version = "1", features = ["net", "time", "macros", "io-util"]} + +[patch.crates-io] +tokio = { git = "https://github.com/zonyitoo/tokio.git" } diff --git a/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs b/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs index cd4c6a523002..9f76c97e9f6a 100644 --- a/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs +++ b/crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs @@ -26,10 +26,11 @@ use crate::net::{ enum TcpStreamState { Connected, FastOpenConnect(SocketAddr), + FastOpenConnecting, } /// A `TcpStream` that supports TFO (TCP Fast Open) -#[pin_project] +#[pin_project(project = TcpStreamProj)] pub struct TcpStream { #[pin] inner: TokioTcpStream, @@ -116,47 +117,55 @@ impl AsyncWrite for TcpStream { // Example program: // - // Wait until socket is writable - ready!(this.inner.poll_write_ready(cx))?; + let TcpStreamProj { inner, state } = this; + + let stream = inner.get_mut(); + let n = ready!(stream.poll_write_io(cx, || { + unsafe { + let saddr = SockAddr::from(*addr); + + let ret = libc::sendto( + stream.as_raw_fd(), + buf.as_ptr() as *const libc::c_void, + buf.len(), + 0, // Yes, BSD doesn't need MSG_FASTOPEN + saddr.as_ptr(), + saddr.len(), + ); + + if ret >= 0 { + Ok(ret as usize) + } else { + // Error occurs + let err = io::Error::last_os_error(); - unsafe { - let saddr = SockAddr::from(*addr); - - let ret = libc::sendto( - this.inner.as_raw_fd(), - buf.as_ptr() as *const libc::c_void, - buf.len(), - 0, // Yes, BSD doesn't need MSG_FASTOPEN - saddr.as_ptr(), - saddr.len(), - ); - - if ret >= 0 { - // Connect successfully. - *(this.state) = TcpStreamState::Connected; - return Ok(ret as usize).into(); - } else { - // Error occurs - let err = io::Error::last_os_error(); - - // EAGAIN, EWOULDBLOCK - if err.kind() != ErrorKind::WouldBlock { // EINPROGRESS if let Some(libc::EINPROGRESS) = err.raw_os_error() { // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). // // So in this state. We have to loop again to call `poll_write` for sending the first packet. - *(this.state) = TcpStreamState::Connected; + *state = TcpStreamState::FastOpenConnecting; + + // Let `poll_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) } else { - // Other errors - return Err(err).into(); + // Other errors, including EAGAIN, EWOULDBLOCK + Err(err) } - } else { - // Pending on poll_write_ready } } - } + }))?; + + // Connect successfully with fast open + *state = TcpStreamState::Connected; + return Ok(n).into(); + } + + TcpStreamState::FastOpenConnecting => { + ready!(this.inner.poll_write_ready(cx))?; + + *(this.state) = TcpStreamState::Connected; } TcpStreamState::Connected => return this.inner.poll_write(cx, buf), diff --git a/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs b/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs index dce91dfc731f..9a9ccb2050cf 100644 --- a/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs +++ b/crates/shadowsocks/src/net/sys/unix/bsd/macos.rs @@ -27,10 +27,11 @@ use crate::net::{ enum TcpStreamState { Connected, FastOpenWrite, + FastOpenConnecting, } /// A `TcpStream` that supports TFO (TCP Fast Open) -#[pin_project] +#[pin_project(project = TcpStreamProj)] pub struct TcpStream { #[pin] inner: TokioTcpStream, @@ -129,27 +130,45 @@ impl AsyncWrite for TcpStream { // (NOT SURE) If remote server doesn't support TFO or this is the first connection, // it may return EINPROGRESS just like other platforms (Linux, FreeBSD). - match ready!(this.inner.poll_write(cx, buf)) { - Ok(n) => { - *(this.state) = TcpStreamState::Connected; - return Ok(n).into(); - } - Err(err) => { - // EAGAIN and EWOULDBLOCK should have been handled by tokio - // - // EINPROGRESS - if let Some(libc::EINPROGRESS) = err.raw_os_error() { - // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. - // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). - // - // So in this state. We have to loop again to call `poll_write` for sending the first packet. - *(this.state) = TcpStreamState::Connected; + let TcpStreamProj { inner, state } = this; + + let stream = inner.get_mut(); + let n = ready!(stream.poll_write_io(cx, || { + unsafe { + let ret = libc::send(stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), 0); + if ret >= 0 { + Ok(ret as usize) } else { - // Other errors - return Err(err).into(); + let err = io::Error::last_os_error(); + // EAGAIN and EWOULDBLOCK should have been handled by tokio + // + // EINPROGRESS + if let Some(libc::EINPROGRESS) = err.raw_os_error() { + // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. + // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). + // + // So in this state. We have to loop again to call `poll_write` for sending the first packet. + *state = TcpStreamState::FastOpenConnecting; + + // Let `poll_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) + } else { + // Other errors, including EAGAIN + Err(err) + } } } - } + }))?; + + // Connected successfully with fast open + *state = TcpStreamState::Connected; + return Ok(n).into(); + } + + TcpStreamState::FastOpenConnecting => { + ready!(this.inner.poll_write_ready(cx))?; + + *(this.state) = TcpStreamState::Connected; } TcpStreamState::Connected => return this.inner.poll_write(cx, buf), diff --git a/crates/shadowsocks/src/net/sys/unix/linux/mod.rs b/crates/shadowsocks/src/net/sys/unix/linux/mod.rs index b4739927475d..ca6059381784 100644 --- a/crates/shadowsocks/src/net/sys/unix/linux/mod.rs +++ b/crates/shadowsocks/src/net/sys/unix/linux/mod.rs @@ -29,10 +29,11 @@ enum TcpStreamState { Connected, FastOpenConnect(SocketAddr), FastOpenWrite, + FastOpenConnecting, } /// A `TcpStream` that supports TFO (TCP Fast Open) -#[pin_project] +#[pin_project(project = TcpStreamProj)] pub struct TcpStream { #[pin] inner: TokioTcpStream, @@ -187,70 +188,95 @@ impl AsyncWrite for TcpStream { // // Uses sendto as BSD-like systems - // Wait until socket is writable - ready!(this.inner.poll_write_ready(cx))?; + let TcpStreamProj { inner, state } = this; + + let stream = inner.get_mut(); + ready!(stream.poll_write_io(cx, || { + unsafe { + let saddr = SockAddr::from(*addr); + + let ret = libc::sendto( + stream.as_raw_fd(), + buf.as_ptr() as *const libc::c_void, + buf.len(), + libc::MSG_FASTOPEN, + saddr.as_ptr(), + saddr.len(), + ); + + if ret >= 0 { + Ok(ret as usize) + } else { + // Error occurs + let err = io::Error::last_os_error(); - unsafe { - let saddr = SockAddr::from(*addr); - - let ret = libc::sendto( - this.inner.as_raw_fd(), - buf.as_ptr() as *const libc::c_void, - buf.len(), - libc::MSG_FASTOPEN, - saddr.as_ptr(), - saddr.len(), - ); - - if ret >= 0 { - // Connect successfully. - *(this.state) = TcpStreamState::Connected; - return Ok(ret as usize).into(); - } else { - // Error occurs - let err = io::Error::last_os_error(); - - // EAGAIN, EWOULDBLOCK - if err.kind() != ErrorKind::WouldBlock { // EINPROGRESS if let Some(libc::EINPROGRESS) = err.raw_os_error() { // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). // // So in this state. We have to loop again to call `poll_write` for sending the first packet. - *(this.state) = TcpStreamState::Connected; + *state = TcpStreamState::FastOpenConnecting; + + // Let `try_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) } else { - // Other errors - return Err(err).into(); + // Other errors, including EAGAIN, EWOULDBLOCK + Err(err) } - } else { - // Pending on poll_write_ready } } - } + }))?; + + // Connect successfully with fast open + *state = TcpStreamState::Connected; + return Ok(n).into(); } TcpStreamState::FastOpenWrite => { // First `write` after `TCP_FASTOPEN_CONNECT` // Kernel >= 4.11 - match ready!(this.inner.poll_write(cx, buf)) { - Ok(n) => { - *(this.state) = TcpStreamState::Connected; - return Ok(n).into(); - } - Err(err) => { - // EAGAIN and EWOULDBLOCK should have been handled by tokio - // - // EINPROGRESS - if let Some(libc::EINPROGRESS) = err.raw_os_error() { - // loop again to call `poll_write` for sending the first packet - *(this.state) = TcpStreamState::Connected; + let n = ready!(this.inner.get_mut().poll_write_io(cx, || { + unsafe { + let ret = libc::send( + this.inner.as_raw_fd(), + buf.as_ptr() as *const libc::c_void, + buf.len(), + 0, + ); + + if ret >= 0 { + Ok(ret as usize) } else { - return Err(err).into(); + let err = io::Error::last_os_error(); + // EINPROGRESS + if let Some(libc::EINPROGRESS) = err.raw_os_error() { + // For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available. + // If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect(). + // + // So in this state. We have to loop again to call `poll_write` for sending the first packet. + *(this.state) = TcpStreamState::FastOpenConnecting; + + // Let `poll_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) + } else { + // Other errors, including EAGAIN, EWOULDBLOCK + Err(err) + } } } - } + }))?; + + // Connect successfully with fast open + *(this.state) = TcpStreamState::Connected; + return Ok(n).into(); + } + + TcpStreamState::FastOpenConnecting => { + ready!(this.inner.poll_write_ready(cx))?; + + *(this.state) = TcpStreamState::Connected; } TcpStreamState::Connected => return this.inner.poll_write(cx, buf), diff --git a/crates/shadowsocks/src/net/sys/windows/mod.rs b/crates/shadowsocks/src/net/sys/windows/mod.rs index 96d272b5f247..82c4c251cb9a 100644 --- a/crates/shadowsocks/src/net/sys/windows/mod.rs +++ b/crates/shadowsocks/src/net/sys/windows/mod.rs @@ -233,9 +233,8 @@ impl AsyncWrite for TcpStream { loop { match this.state { - TcpStreamState::Connected => { - return this.inner.poll_write(cx, buf); - } + TcpStreamState::Connected => return this.inner.poll_write(cx, buf), + TcpStreamState::FastOpenConnect(addr) => { unsafe { // https://docs.microsoft.com/en-us/windows/win32/api/mswsock/nc-mswsock-lpfn_connectex @@ -279,44 +278,50 @@ impl AsyncWrite for TcpStream { *(this.state) = TcpStreamState::FastOpenConnecting(overlapped); } } - TcpStreamState::FastOpenConnecting(ref mut overlapped) => { - // Wait until socket is writable - ready!(this.inner.poll_write_ready(cx))?; - - unsafe { - let sock = this.inner.as_raw_socket() as SOCKET; - - let mut bytes_sent: DWORD = 0; - let mut flags: DWORD = 0; - // Fetch ConnectEx's result in a non-blocking way. - let ret: BOOL = WSAGetOverlappedResult( - sock, - overlapped.as_mut() as LPOVERLAPPED, - &mut bytes_sent as LPDWORD, - FALSE, // fWait = false, non-blocking, returns WSA_IO_INCOMPLETE - &mut flags as LPDWORD, - ); - - if ret == TRUE { - // Get ConnectEx's result successfully. Socket is connected - - // Make getpeername() works - set_update_connect_context(sock)?; - - debug_assert!(bytes_sent as usize <= buf.len()); - - *(this.state) = TcpStreamState::Connected; - return Ok(bytes_sent as usize).into(); + TcpStreamState::FastOpenConnecting(ref mut overlapped) => { + let n = ready!(this.inner.poll_write_io(cx, || { + unsafe { + let sock = this.inner.as_raw_socket() as SOCKET; + + let mut bytes_sent: DWORD = 0; + let mut flags: DWORD = 0; + + // Fetch ConnectEx's result in a non-blocking way. + let ret: BOOL = WSAGetOverlappedResult( + sock, + overlapped.as_mut() as LPOVERLAPPED, + &mut bytes_sent as LPDWORD, + FALSE, // fWait = false, non-blocking, returns WSA_IO_INCOMPLETE + &mut flags as LPDWORD, + ); + + if ret == TRUE { + // Get ConnectEx's result successfully. Socket is connected + + // Make getpeername() works + set_update_connect_context(sock)?; + + debug_assert!(bytes_sent as usize <= buf.len()); + + Ok(bytes_sent as usize) + } + + let err = WSAGetLastError(); + if err == WSA_IO_INCOMPLETE { + // ConnectEx is still not connected. Wait for the next round + // + // Let `try_write_io` clears the write readiness. + Err(ErrorKind::WouldBlock.into()) + } else { + Err(io::Error::from_raw_os_error(err)) + } } + }))?; - let err = WSAGetLastError(); - if err == WSA_IO_INCOMPLETE { - // ConnectEx is still not connected. Wait for the next round - } else { - return Err(io::Error::from_raw_os_error(err)).into(); - } - } + // Connect successfully with fast open + *(this.state) = TcpStreamState::Connected; + return Ok(n).into(); } } }