Skip to content

Commit

Permalink
clearing write readiness if TFO connect returns EINPROGRESS
Browse files Browse the repository at this point in the history
- ref #555
- imperfect until tokio-rs/tokio#3888 was merged
  • Loading branch information
zonyitoo committed Jun 25, 2021
1 parent 3743ffe commit 90e61b5
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 141 deletions.
14 changes: 6 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Expand Up @@ -141,3 +141,6 @@ byteorder = "1.3"
env_logger = "0.8"
byte_string = "1.0"
tokio = { version = "1", features = ["net", "time", "macros", "io-util"]}

[patch.crates-io]
tokio = { git = "https://github.com/zonyitoo/tokio.git" }
71 changes: 40 additions & 31 deletions crates/shadowsocks/src/net/sys/unix/bsd/freebsd.rs
Expand Up @@ -26,10 +26,11 @@ use crate::net::{
enum TcpStreamState {
Connected,
FastOpenConnect(SocketAddr),
FastOpenConnecting,
}

/// A `TcpStream` that supports TFO (TCP Fast Open)
#[pin_project]
#[pin_project(project = TcpStreamProj)]
pub struct TcpStream {
#[pin]
inner: TokioTcpStream,
Expand Down Expand Up @@ -116,47 +117,55 @@ impl AsyncWrite for TcpStream {
// Example program:
// <https://people.freebsd.org/~pkelsey/tfo-tools/tfo-client.c>

// Wait until socket is writable
ready!(this.inner.poll_write_ready(cx))?;
let TcpStreamProj { inner, state } = this;

let stream = inner.get_mut();
let n = ready!(stream.poll_write_io(cx, || {
unsafe {
let saddr = SockAddr::from(*addr);

let ret = libc::sendto(
stream.as_raw_fd(),
buf.as_ptr() as *const libc::c_void,
buf.len(),
0, // Yes, BSD doesn't need MSG_FASTOPEN
saddr.as_ptr(),
saddr.len(),
);

if ret >= 0 {
Ok(ret as usize)
} else {
// Error occurs
let err = io::Error::last_os_error();

unsafe {
let saddr = SockAddr::from(*addr);

let ret = libc::sendto(
this.inner.as_raw_fd(),
buf.as_ptr() as *const libc::c_void,
buf.len(),
0, // Yes, BSD doesn't need MSG_FASTOPEN
saddr.as_ptr(),
saddr.len(),
);

if ret >= 0 {
// Connect successfully.
*(this.state) = TcpStreamState::Connected;
return Ok(ret as usize).into();
} else {
// Error occurs
let err = io::Error::last_os_error();

// EAGAIN, EWOULDBLOCK
if err.kind() != ErrorKind::WouldBlock {
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*(this.state) = TcpStreamState::Connected;
*state = TcpStreamState::FastOpenConnecting;

// Let `poll_write_io` clears the write readiness.
Err(ErrorKind::WouldBlock.into())
} else {
// Other errors
return Err(err).into();
// Other errors, including EAGAIN, EWOULDBLOCK
Err(err)
}
} else {
// Pending on poll_write_ready
}
}
}
}))?;

// Connect successfully with fast open
*state = TcpStreamState::Connected;
return Ok(n).into();
}

TcpStreamState::FastOpenConnecting => {
ready!(this.inner.poll_write_ready(cx))?;

*(this.state) = TcpStreamState::Connected;
}

TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
Expand Down
57 changes: 38 additions & 19 deletions crates/shadowsocks/src/net/sys/unix/bsd/macos.rs
Expand Up @@ -27,10 +27,11 @@ use crate::net::{
enum TcpStreamState {
Connected,
FastOpenWrite,
FastOpenConnecting,
}

/// A `TcpStream` that supports TFO (TCP Fast Open)
#[pin_project]
#[pin_project(project = TcpStreamProj)]
pub struct TcpStream {
#[pin]
inner: TokioTcpStream,
Expand Down Expand Up @@ -129,27 +130,45 @@ impl AsyncWrite for TcpStream {
// (NOT SURE) If remote server doesn't support TFO or this is the first connection,
// it may return EINPROGRESS just like other platforms (Linux, FreeBSD).

match ready!(this.inner.poll_write(cx, buf)) {
Ok(n) => {
*(this.state) = TcpStreamState::Connected;
return Ok(n).into();
}
Err(err) => {
// EAGAIN and EWOULDBLOCK should have been handled by tokio
//
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*(this.state) = TcpStreamState::Connected;
let TcpStreamProj { inner, state } = this;

let stream = inner.get_mut();
let n = ready!(stream.poll_write_io(cx, || {
unsafe {
let ret = libc::send(stream.as_raw_fd(), buf.as_ptr() as *const libc::c_void, buf.len(), 0);
if ret >= 0 {
Ok(ret as usize)
} else {
// Other errors
return Err(err).into();
let err = io::Error::last_os_error();
// EAGAIN and EWOULDBLOCK should have been handled by tokio
//
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*state = TcpStreamState::FastOpenConnecting;

// Let `poll_write_io` clears the write readiness.
Err(ErrorKind::WouldBlock.into())
} else {
// Other errors, including EAGAIN
Err(err)
}
}
}
}
}))?;

// Connected successfully with fast open
*state = TcpStreamState::Connected;
return Ok(n).into();
}

TcpStreamState::FastOpenConnecting => {
ready!(this.inner.poll_write_ready(cx))?;

*(this.state) = TcpStreamState::Connected;
}

TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
Expand Down
116 changes: 71 additions & 45 deletions crates/shadowsocks/src/net/sys/unix/linux/mod.rs
Expand Up @@ -29,10 +29,11 @@ enum TcpStreamState {
Connected,
FastOpenConnect(SocketAddr),
FastOpenWrite,
FastOpenConnecting,
}

/// A `TcpStream` that supports TFO (TCP Fast Open)
#[pin_project]
#[pin_project(project = TcpStreamProj)]
pub struct TcpStream {
#[pin]
inner: TokioTcpStream,
Expand Down Expand Up @@ -187,70 +188,95 @@ impl AsyncWrite for TcpStream {
//
// Uses sendto as BSD-like systems

// Wait until socket is writable
ready!(this.inner.poll_write_ready(cx))?;
let TcpStreamProj { inner, state } = this;

let stream = inner.get_mut();
let n = ready!(stream.poll_write_io(cx, || {
unsafe {
let saddr = SockAddr::from(*addr);

let ret = libc::sendto(
stream.as_raw_fd(),
buf.as_ptr() as *const libc::c_void,
buf.len(),
libc::MSG_FASTOPEN,
saddr.as_ptr(),
saddr.len(),
);

if ret >= 0 {
Ok(ret as usize)
} else {
// Error occurs
let err = io::Error::last_os_error();

unsafe {
let saddr = SockAddr::from(*addr);

let ret = libc::sendto(
this.inner.as_raw_fd(),
buf.as_ptr() as *const libc::c_void,
buf.len(),
libc::MSG_FASTOPEN,
saddr.as_ptr(),
saddr.len(),
);

if ret >= 0 {
// Connect successfully.
*(this.state) = TcpStreamState::Connected;
return Ok(ret as usize).into();
} else {
// Error occurs
let err = io::Error::last_os_error();

// EAGAIN, EWOULDBLOCK
if err.kind() != ErrorKind::WouldBlock {
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*(this.state) = TcpStreamState::Connected;
*state = TcpStreamState::FastOpenConnecting;

// Let `try_write_io` clears the write readiness.
Err(ErrorKind::WouldBlock.into())
} else {
// Other errors
return Err(err).into();
// Other errors, including EAGAIN, EWOULDBLOCK
Err(err)
}
} else {
// Pending on poll_write_ready
}
}
}
}))?;

// Connect successfully with fast open
*state = TcpStreamState::Connected;
return Ok(n).into();
}

TcpStreamState::FastOpenWrite => {
// First `write` after `TCP_FASTOPEN_CONNECT`
// Kernel >= 4.11

match ready!(this.inner.poll_write(cx, buf)) {
Ok(n) => {
*(this.state) = TcpStreamState::Connected;
return Ok(n).into();
}
Err(err) => {
// EAGAIN and EWOULDBLOCK should have been handled by tokio
//
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// loop again to call `poll_write` for sending the first packet
*(this.state) = TcpStreamState::Connected;
let n = ready!(this.inner.get_mut().poll_write_io(cx, || {
unsafe {
let ret = libc::send(
this.inner.as_raw_fd(),
buf.as_ptr() as *const libc::c_void,
buf.len(),
0,
);

if ret >= 0 {
Ok(ret as usize)
} else {
return Err(err).into();
let err = io::Error::last_os_error();
// EINPROGRESS
if let Some(libc::EINPROGRESS) = err.raw_os_error() {
// For non-blocking socket, it returns the number of bytes queued (and transmitted in the SYN-data packet) if cookie is available.
// If cookie is not available, it transmits a data-less SYN packet with Fast Open cookie request option and returns -EINPROGRESS like connect().
//
// So in this state. We have to loop again to call `poll_write` for sending the first packet.
*(this.state) = TcpStreamState::FastOpenConnecting;

// Let `poll_write_io` clears the write readiness.
Err(ErrorKind::WouldBlock.into())
} else {
// Other errors, including EAGAIN, EWOULDBLOCK
Err(err)
}
}
}
}
}))?;

// Connect successfully with fast open
*(this.state) = TcpStreamState::Connected;
return Ok(n).into();
}

TcpStreamState::FastOpenConnecting => {
ready!(this.inner.poll_write_ready(cx))?;

*(this.state) = TcpStreamState::Connected;
}

TcpStreamState::Connected => return this.inner.poll_write(cx, buf),
Expand Down

0 comments on commit 90e61b5

Please sign in to comment.