Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: rework I/O driver to use an intrusive linked list for wakers #2779

Closed
carllerche opened this issue Aug 20, 2020 · 6 comments
Closed

io: rework I/O driver to use an intrusive linked list for wakers #2779

carllerche opened this issue Aug 20, 2020 · 6 comments
Assignees
Labels
A-tokio Area: The main tokio crate C-proposal Category: a proposal and request for comments M-io Module: tokio/io
Milestone

Comments

@carllerche
Copy link
Member

carllerche commented Aug 20, 2020

Refactor I/O driver

Describes changes to the I/O driver for the Tokio 0.3 release.

Goals

  • Support async fn on I/O types with &self.
  • Support concurrent UDP send
  • Refine the Registration API.
  • Remove the need for "split" style APIs.

Non-goals

  • Implement AsyncRead / AsyncWrite for &TcpStream or other reference type.

Overview

Currently, I/O types require &mut self for async functions. The reason for
this is the task's waker is stored in the I/O resource's internal state
(ScheduledIo) instead of in the future returned by the async function.
Because of this limitation, I/O types limit the number of wakers to one per
direction (a direction is either read-related events or write-related events).

Moving the waker from the internal I/O resource's state to the operation's
future enables multiple wakers to be registered per operation. The "intrusive
wake list" strategy used by Notify applies to this case, though there are some
concerns unique to the I/O driver.

Reworking the Registration type

While Registration is made private (per #2728), it remains in Tokio as an
implementation detail backing I/O resources such as TcpStream. The API of
Registration is updated to support waiting for an arbitrary interest set with
&self. This supports concurrent waiters with a different readiness interest.

struct Registration { ... }

// TODO: naming
struct ReadyEvent {
    tick: u32,
    ready: mio::Ready,
}

impl Registration {
    /// `interest` must be a super set of **all** interest sets specified in
    /// the other methods. This is the interest set passed to `mio`.
    pub fn new<T>(io: &T, interest: mio::Ready) -> io::Result<Registration>
        where T: mio::Evented;

    /// Awaits for any readiness event included in `interest`. Returns a
    /// `ReadyEvent` representing the received readiness event.
    async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent>;

    /// Clears resource level readiness represented by the specified `ReadyEvent`
    async fn clear_readiness(&self, ready_event: ReadyEvent);

A new registration is created for a T: mio::Evented and a interest. This
creates a ScheduledIo entry with the I/O driver and registers the resource
with mio.

Because Tokio uses edge-triggered notifications, the I/O driver only
receives readiness from the OS once the ready state changes. The I/O driver
must track each resource's known readiness state. This helps prevent syscalls
when the process knows the syscall should return with EWOULDBLOCK.

A call to readiness() checks if the currently known resource readiness
overlaps with interest. If it does, then the readiness() immediately
returns. If it does not, then the task waits until the I/O driver receives a
readiness event.

The pseudocode to perform a TCP read is as follows.

async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
    loop {
        // Await readiness
        let event = self.readiness(interest).await?;

        match self.mio_socket.read(buf) {
            Ok(v) => return Ok(v),
            Err(ref e) if e.kind() == WouldBlock => {
                self.clear_readiness(event);
            }
            Err(e) => return Err(e),
        }
    }
}

Reworking the ScheduledIo type

The ScheduledIo type is switched to use an intrusive waker linked list. Each
entry in the linked list includes the interest set passed to readiness().

#[derive(Debug)]
pub(crate) struct ScheduledIo {
    /// Resource's known state packed with other state that must be
    /// atomically updated.
    readiness: AtomicUsize,

    /// Tracks tasks waiting on the resource
    waiters: Mutex<Waiters>,
}

#[derive(Debug)]
struct Waiters {
    // List of intrusive waiters.
    list: LinkedList<Waiter>,

    /// Waiter used by `AsyncRead` implementations.
    reader: Option<Waker>,

    /// Waiter used by `AsyncWrite` implementations.
    writer: Option<Waker>,
}

// This struct is contained by the **future** returned by `readiness()`.
#[derive(Debug)]
struct Waiter {
    /// Intrusive linked-list pointers
    pointers: linked_list::Pointers<Waiter>,

    /// Waker for task waiting on I/O resource
    waiter: Option<Waker>,

    /// Readiness events being waited on. This is
    /// the value passed to `readiness()`
    interest: mio::Ready,

    /// Should not be `Unpin`.
    _p: PhantomPinned,
}

When an I/O event is received from mio, the associated resources' readiness is
updated and the waiter list is iterated. All waiters with interest that
overlap the received readiness event are notified. Any waiter with an interest
that does not overlap the readiness event remains in the list.

Cancel interest on drop

The future returned by readiness() uses an intrusive linked list to store the
waker with ScheduledIo. Because readiness() can be called concurrently, many
wakers may be stored simultaneously in the list. If the readiness() future is
dropped early, it is essential that the waker is removed from the list. This
prevents leaking memory.

Race condition

Consider how many tasks may concurrently attempt I/O operations. This, combined
with how Tokio uses edge-triggered events, can result in a race condition. Let's
revisit the TCP read function:

async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
    loop {
        // Await readiness
        let event = self.readiness(interest).await?;

        match self.mio_socket.read(buf) {
            Ok(v) => return Ok(v),
            Err(ref e) if e.kind() == WouldBlock => {
                self.clear_readiness(event);
            }
            Err(e) => return Err(e),
        }
    }
}

If care is not taken, if between mio_socket.read(buf) returning and
clear_readiness(event) is called, a readiness event arrives, the read()
function could deadlock. This happens because the readiness event is received,
clear_readiness() unsets the readiness event, and on the next iteration,
readiness().await will block forever as a new readiness event is not received.

The current I/O driver handles this condition by always registering the task's
waker before performing the operation. This is not ideal as it will result in
unnecessary task notification.

Instead, we will use a strategy to prevent clearing readiness if an "unseen"
readiness event has been received. The I/O driver will maintain a "tick" value.
Every time the mio poll() function is called, the tick is incremented. Each
readiness event has an associated tick. When the I/O driver sets the resource's
readiness, the driver's tick is packed into the atomic usize.

The ScheduledIo readiness AtomicUsize is structured as:

| reserved | generation |  driver tick | readinesss |
|----------+------------+--------------+------------|
|   1 bit  |   7 bits   +    8 bits    +   16 bits  |

The reserved and generation components exist today.

The readiness() function returns a ReadyEvent value. This value includes the
tick component read with the resource's readiness value. When
clear_readiness() is called, the ReadyEvent is provided. Readiness is only
cleared if the current tick matches the tick included in the ReadyEvent.
If the tick values do not match, the call to readiness() on the next iteration
will not block and the new tick is included in the new ReadyToken.

TODO

Implementing AsyncRead / AsyncWrite

The AsyncRead and AsyncWrite traits use a "poll" based API. This means that
it is not possible to use an intrusive linked list to track the waker.
Additionally, there is no future associated with the operation which means it is
not possible to cancel interest in the readiness events.

To implement AsyncRead and AsyncWrite, ScheduledIo includes dedicated
waker values for the read direction and the write direction. These values are
used to store the waker. Specific interest is not tracked for AsyncRead and
AsyncWrite implementations. It is assumed that only events of interest are:

  • Read ready
  • Read closed
  • Write ready
  • Write closed

Note that "read closed" and "write closed" are only available with Mio 0.7. With
Mio 0.6, things were a bit messy.

It is only possible to implement AsyncRead and AsyncWrite for resource types
themselves and not for &Resource. Implementing the traits for &Resource
would permit concurrent operations to the resource. Because only a single waker
is stored per direction, any concurrent usage would result in deadlocks. An
alterate implementation would call for a Vec<Waker> but this would result in
memory leaks.

Enabling reads and writes for &TcpStream

Instead of implementing AsyncRead and AsyncWrite for &TcpStream, a new
function is added to TcpStream.

impl TcpStream {
    /// Naming TBD
    fn by_ref(&self) -> TcpStreamRef<'_>;
}

struct TcpStreamRef<'a> {
    stream: &'a TcpStream,

    // `Waiter` is the node in the intrusive waiter linked-list
    read_waiter: Waiter,
    write_waiter: Waiter,
}

Now, AsyncRead and AsyncWrite can be implemented on TcpStreamRef<'a>. When
the TcpStreamRef is dropped, all associated waker resources are cleaned up.

Removing all the split() functions

With TcpStream::by_ref(), TcpStream::split() is no longer needed. Instead,
it is possible to do something as follows.

let rd = my_stream.by_ref();
let wr = my_stream.by_ref();

select! {
    // use `rd` and `wr` in separate branches.
}

It is also possible to sotre a TcpStream in an Arc.

let arc_stream = Arc::new(my_tcp_stream);
let n = arc_stream.by_ref().read(buf).await?;
@carllerche carllerche added C-proposal Category: a proposal and request for comments A-tokio Area: The main tokio crate M-io Module: tokio/io labels Aug 20, 2020
@carllerche carllerche added this to the v0.3 milestone Aug 20, 2020
@carllerche carllerche mentioned this issue Aug 21, 2020
10 tasks
@Darksonn
Copy link
Contributor

Darksonn commented Sep 2, 2020

This seems to break some abstractions that work now. For example, now you can put the owned read half of a tcp stream into a FramedRead, whereas with this approach, you have to put a TcpStreamRef into the FramedRead, meaning that the stream is no longer an owned type that can be e.g. returned from a function.

@seanmonstar
Copy link
Member

Regarding the by_ref/split stuff:

We want to reduce the amount "split" APIs a user must pick from like for TCP. There's the by-ref one, the owned one, and the generic io::split. I'd like propose merging the TCP owned version and the generic internally, all while only showing 1 api.

  • The by-ref one is neat, but I suspect it's more common to want to to split to different tasks.
  • We can make the Split type internally be an enum where normally it's a bilock, and tokio types can construct a variant that doesn't need a lock.
  • It does require an Arc either way, which the by-ref version doesn't need.

But in the end, this way means there's only 1 api a user needs to work with.

carllerche pushed a commit that referenced this issue Sep 23, 2020
This refactors I/O registration in a few ways:

- Cleans up the cached readiness in `PollEvented`. This cache used to
  be helpful when readiness was a linked list of `*mut Node`s in
  `Registration`. Previous refactors have turned `Registration` into just
  an `AtomicUsize` holding the current readiness, so the cache is just
  extra work and complexity. Gone.
- Polling the `Registration` for readiness now gives a `ReadyEvent`,
  which includes the driver tick. This event must be passed back into
  `clear_readiness`, so that the readiness is only cleared from `Registration`
  if the tick hasn't changed. Previously, it was possible to clear the
  readiness even though another thread had *just* polled the driver and
  found the socket ready again.
- Registration now also contains an `async fn readiness`, which stores
  wakers in an instrusive linked list. This allows an unbounded number
  of tasks to register for readiness (previously, only 1 per direction (read
  and write)). By using the intrusive linked list, there is no concern of
  leaking the storage of the wakers, since they are stored inside the `async fn`
  and released when the future is dropped.
- Registration retains a `poll_readiness(Direction)` method, to support
  `AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and
  so there are 2 reserved slots for those methods.
- IO types where it makes sense to have multiple tasks waiting on them
  now take advantage of this new `async fn readiness`, such as `UdpSocket`
  and `UnixDatagram`.

Additionally, this makes the `io-driver` "feature" internal-only (no longer
documented, not part of public API), and adds a second internal-only
feature, `io-readiness`, to group together linked list part of registration
that is only used by some of the IO types.

After a bit of discussion, changing stream-based transports (like
`TcpStream`) to have `async fn read(&self)` is punted, since that
is likely too easy of a footgun to activate.

Refs: #2779, #2728
@leshow
Copy link
Contributor

leshow commented Sep 25, 2020

I don't see a way to do an owned or by_ref split for UdpSocket currently. Is this planned to work like UnixStream and just not implemented? If so, I'd like to offer up my time to work on it if you don't have anyone else.

edit: silly me, I didn't realize that the api is not longer &mut so it doesn't need to be split. I'm working on a PR for the docs right now, I can add some reference to this.

@carllerche
Copy link
Member Author

@leshow UdpSocket should now take &self for all fns. This means it can be used concurrently and does not require an explicit split.

@leshow
Copy link
Contributor

leshow commented Oct 6, 2020

Yep! That was misunderstanding on my part. I've actually included an example of the new concurrent send in my PR for UdpSocket docs

@carllerche
Copy link
Member Author

Enough of this issue has been done for 0.3. The remaining small tweaks for 0.3 are tracked by #2928.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-proposal Category: a proposal and request for comments M-io Module: tokio/io
Projects
None yet
Development

No branches or pull requests

4 participants