Skip to content

Commit

Permalink
clearing write readiness if TFO connect returns EINPROGRESS
Browse files Browse the repository at this point in the history
- ref #555
- imperfect until tokio-rs/tokio#3888 was merged
  • Loading branch information
zonyitoo committed Jun 25, 2021
1 parent 3743ffe commit af15d94
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 163 deletions.
14 changes: 6 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Expand Up @@ -141,3 +141,6 @@ byteorder = "1.3"
env_logger = "0.8"
byte_string = "1.0"
tokio = { version = "1", features = ["net", "time", "macros", "io-util"]}

[patch.crates-io]
tokio = { git = "https://github.com/zonyitoo/tokio.git" }
66 changes: 32 additions & 34 deletions crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs
Expand Up @@ -29,7 +29,7 @@ enum TcpStreamState {
}

/// A `TcpStream` that supports TFO (TCP Fast Open)
#[pin_project]
#[pin_project(project = TcpStreamProj)]
pub struct TcpStream {
#[pin]
inner: TokioTcpStream,
Expand Down Expand Up @@ -105,25 +105,25 @@ impl AsyncRead for TcpStream {
}

impl AsyncWrite for TcpStream {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
loop {
let this = self.as_mut().project();
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
let TcpStreamProj { inner, state } = self.project();

match this.state {
TcpStreamState::FastOpenConnect(addr) => {
// TCP_FASTOPEN was supported since FreeBSD 12.0
//
// Example program:
// <https://people.freebsd.org/~pkelsey/tfo-tools/tfo-client.c>
match *state {
TcpStreamState::Connected => inner.poll_write(cx, buf),

// Wait until socket is writable
ready!(this.inner.poll_write_ready(cx))?;
TcpStreamState::FastOpenConnect(addr) => {
// TCP_FASTOPEN was supported since FreeBSD 12.0
//
// Example program:
// <https://people.freebsd.org/~pkelsey/tfo-tools/tfo-client.c>

unsafe {
let saddr = SockAddr::from(*addr);
let saddr = SockAddr::from(addr);

let stream = inner.get_mut();
let n = ready!(stream.poll_write_io(cx, || {
unsafe {
let ret = libc::sendto(
this.inner.as_raw_fd(),
stream.as_raw_fd(),
buf.as_ptr() as *const libc::c_void,
buf.len(),
0, // Yes, BSD doesn't need MSG_FASTOPEN
Expand All @@ -132,34 +132,32 @@ impl AsyncWrite for TcpStream {
);

if ret >= 0 {
// Connect successfully.
*(this.state) = TcpStreamState::Connected;
return Ok(ret as usize).into();
Ok(ret as usize)
} else {
// Error occurs
let err = io::Error::last_os_error();

// EAGAIN, EWOULDBLOCK
if err.kind() != ErrorKind::WouldBlock {
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*(this.state) = TcpStreamState::Connected;
} else {
// Other errors
return Err(err).into();
}
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*state = TcpStreamState::Connected;

// Let `poll_write_io` clears the write readiness.
Err(ErrorKind::WouldBlock.into())
} else {
// Pending on poll_write_ready
// Other errors, including EAGAIN, EWOULDBLOCK
Err(err)
}
}
}
}
}))?;

TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
// Connect successfully with fast open
*state = TcpStreamState::Connected;
Ok(n).into()
}
}
}
Expand Down
56 changes: 32 additions & 24 deletions crates/shadowsocks/src/net/sys/unix/bsd/macos.rs
Expand Up @@ -30,7 +30,7 @@ enum TcpStreamState {
}

/// A `TcpStream` that supports TFO (TCP Fast Open)
#[pin_project]
#[pin_project(project = TcpStreamProj)]
pub struct TcpStream {
#[pin]
inner: TokioTcpStream,
Expand Down Expand Up @@ -117,24 +117,27 @@ impl AsyncRead for TcpStream {
}

impl AsyncWrite for TcpStream {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
loop {
let this = self.as_mut().project();

match this.state {
TcpStreamState::FastOpenWrite => {
// `CONNECT_RESUME_ON_READ_WRITE` is set when calling `connectx`,
// so the first call of `send` will perform the actual SYN with TFO cookie.
//
// (NOT SURE) If remote server doesn't support TFO or this is the first connection,
// it may return EINPROGRESS just like other platforms (Linux, FreeBSD).

match ready!(this.inner.poll_write(cx, buf)) {
Ok(n) => {
*(this.state) = TcpStreamState::Connected;
return Ok(n).into();
}
Err(err) => {
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
let TcpStreamProj { inner, state } = self.project();

match *state {
TcpStreamState::Connected => inner.poll_write(cx, buf),

TcpStreamState::FastOpenWrite => {
// `CONNECT_RESUME_ON_READ_WRITE` is set when calling `connectx`,
// so the first call of `send` will perform the actual SYN with TFO cookie.
//
// (NOT SURE) If remote server doesn't support TFO or this is the first connection,
// it may return EINPROGRESS just like other platforms (Linux, FreeBSD).

let stream = inner.get_mut();
let n = ready!(stream.poll_write_io(cx, || {
unsafe {
let ret = libc::send(stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), 0);
if ret >= 0 {
Ok(ret as usize)
} else {
let err = io::Error::last_os_error();
// EAGAIN and EWOULDBLOCK should have been handled by tokio
//
// EINPROGRESS
Expand All @@ -143,16 +146,21 @@ impl AsyncWrite for TcpStream {
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*(this.state) = TcpStreamState::Connected;
*state = TcpStreamState::Connected;

// Let `poll_write_io` clears the write readiness.
Err(ErrorKind::WouldBlock.into())
} else {
// Other errors
return Err(err).into();
// Other errors, including EAGAIN
Err(err)
}
}
}
}
}))?;

TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
// Connected successfully with fast open
*state = TcpStreamState::Connected;
Ok(n).into()
}
}
}
Expand Down
107 changes: 59 additions & 48 deletions crates/shadowsocks/src/net/sys/unix/linux/mod.rs
Expand Up @@ -32,7 +32,7 @@ enum TcpStreamState {
}

/// A `TcpStream` that supports TFO (TCP Fast Open)
#[pin_project]
#[pin_project(project = TcpStreamProj)]
pub struct TcpStream {
#[pin]
inner: TokioTcpStream,
Expand Down Expand Up @@ -177,24 +177,24 @@ impl AsyncRead for TcpStream {
}

impl AsyncWrite for TcpStream {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
loop {
let this = self.as_mut().project();
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
let TcpStreamProj { inner, state } = self.project();

match this.state {
TcpStreamState::FastOpenConnect(addr) => {
// Fallback mode. Must be kernal < 4.11
//
// Uses sendto as BSD-like systems
match *state {
TcpStreamState::Connected => inner.poll_write(cx, buf),

// Wait until socket is writable
ready!(this.inner.poll_write_ready(cx))?;
TcpStreamState::FastOpenConnect(addr) => {
// Fallback mode. Must be kernal < 4.11
//
// Uses sendto as BSD-like systems

unsafe {
let saddr = SockAddr::from(*addr);
let saddr = SockAddr::from(addr);

let stream = inner.get_mut();
let n = ready!(stream.poll_write_io(cx, || {
unsafe {
let ret = libc::sendto(
this.inner.as_raw_fd(),
stream.as_raw_fd(),
buf.as_ptr() as *const libc::c_void,
buf.len(),
libc::MSG_FASTOPEN,
Expand All @@ -203,57 +203,68 @@ impl AsyncWrite for TcpStream {
);

if ret >= 0 {
// Connect successfully.
*(this.state) = TcpStreamState::Connected;
return Ok(ret as usize).into();
Ok(ret as usize)
} else {
// Error occurs
let err = io::Error::last_os_error();

// EAGAIN, EWOULDBLOCK
if err.kind() != ErrorKind::WouldBlock {
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*(this.state) = TcpStreamState::Connected;
} else {
// Other errors
return Err(err).into();
}
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*state = TcpStreamState::Connected;

// Let `try_write_io` clears the write readiness.
Err(ErrorKind::WouldBlock.into())
} else {
// Pending on poll_write_ready
// Other errors, including EAGAIN, EWOULDBLOCK
Err(err)
}
}
}
}
}))?;

TcpStreamState::FastOpenWrite => {
// First `write` after `TCP_FASTOPEN_CONNECT`
// Kernel >= 4.11
// Connect successfully with fast open
*state = TcpStreamState::Connected;
Ok(n).into()
}

match ready!(this.inner.poll_write(cx, buf)) {
Ok(n) => {
*(this.state) = TcpStreamState::Connected;
return Ok(n).into();
}
Err(err) => {
// EAGAIN and EWOULDBLOCK should have been handled by tokio
//
TcpStreamState::FastOpenWrite => {
// First `write` after `TCP_FASTOPEN_CONNECT`
// Kernel >= 4.11

let stream = inner.get_mut();
let n = ready!(stream.poll_write_io(cx, || {
unsafe {
let ret = libc::send(stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), 0);

if ret >= 0 {
Ok(ret as usize)
} else {
let err = io::Error::last_os_error();
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// loop again to call `poll_write` for sending the first packet
*(this.state) = TcpStreamState::Connected;
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*state = TcpStreamState::Connected;

// Let `poll_write_io` clears the write readiness.
Err(ErrorKind::WouldBlock.into())
} else {
return Err(err).into();
// Other errors, including EAGAIN, EWOULDBLOCK
Err(err)
}
}
}
}
}))?;

TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
// Connect successfully with fast open
*state = TcpStreamState::Connected;
Ok(n).into()
}
}
}
Expand Down

0 comments on commit af15d94

Please sign in to comment.