Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncFd doesn't allow to decide read/write interest making kqueue's fd fail in tokio 0.3 #3072

Closed
HK416-is-all-you-need opened this issue Oct 29, 2020 · 5 comments · Fixed by #3167
Assignees
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-io Module: tokio/io

Comments

@HK416-is-all-you-need
Copy link
Contributor

Version 0.3.2

Platform macOS 10.15

Description
Creating AsyncFd for such file description fails with:
Os { code: 22, kind: InvalidInput, message: "Invalid argument" }

The problem is probably in underlying mio, but I'm not sure how much changed since 0.6

The thing is PollEvented + Mio worked just fine, hence it can be considered a regression?

Just to elaborate: it is the way to create kqueue based timer, similar to linux's timer_fd

I was using nix to get simplified API over kqueue, but it works with 0.2's PollEvented

I use my own wrapper for timer and implement AsRawFd for it.

use libc::c_int;

pub struct RawTimer(c_int);

pub trait TimerFd: std::os::unix::io::AsRawFd + Sync + Send + Unpin {
    fn new() -> Self;
    fn set(&mut self, time: time::Duration);
    fn unset(&mut self);
    fn read(&mut self) -> usize;
}

impl std::os::unix::io::AsRawFd for RawTimer {
    #[inline(always)]
    fn as_raw_fd(&self) -> c_int {
        self.0
    }
}

impl TimerFd for RawTimer {
    fn new() -> Self {
        let fd = nix::sys::event::kqueue().unwrap_or(-1);

        //If you hit this, then most likely you run into OS imposed limit on file descriptor number
        os_assert!(fd != -1);
        Self(fd)
    }

    fn set(&mut self, time: time::Duration) {
        use nix::sys::event::*;

        let flags = EventFlag::EV_ADD | EventFlag::EV_ENABLE | EventFlag::EV_ONESHOT;
        let mut time = time.as_nanos();
        let mut unit = FilterFlag::NOTE_NSECONDS;

        if time > isize::max_value() as u128 {
            unit = FilterFlag::NOTE_USECONDS;
            time /= 1_000;
        }
        if time > isize::max_value() as u128 {
            unit = FilterFlag::empty(); // default is milliseconds
            time /= 1_000;
        }
        if time > isize::max_value() as u128 {
            unit = FilterFlag::NOTE_SECONDS;
            time /= 1_000;
        }

        let time = time as isize;
        kevent(self.0, &[KEvent::new(1, EventFilter::EVFILT_TIMER, flags, unit, time, 0)], &mut [], 0).expect("To arm timer");
    }

    fn unset(&mut self) {
        use nix::sys::event::*;

        let flags = EventFlag::EV_DELETE;
        kevent(self.0, &[KEvent::new(1, EventFilter::EVFILT_TIMER, flags, FilterFlag::empty(), 0, 0)], &mut [], 0).expect("To disarm timer");
    }

    fn read(&mut self) -> usize {
        use nix::sys::event::*;

        let mut ev = [KEvent::new(0, EventFilter::EVFILT_TIMER, EventFlag::empty(), FilterFlag::empty(), 0, 0)];

        kevent(self.0, &[], &mut ev[..], 0).expect("To execute kevent")
    }
}
@HK416-is-all-you-need HK416-is-all-you-need added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Oct 29, 2020
@Darksonn Darksonn added the M-io Module: tokio/io label Oct 29, 2020
@carllerche
Copy link
Member

I don't see you creating the AsyncFd in your snippet. Can you include that?

My guess is that we must filter the interest for some FDs. Can you show me how you were using PollEvented?

@HK416-is-all-you-need
Copy link
Contributor Author

HK416-is-all-you-need commented Oct 29, 2020

Note that I'm using the same code for both timer_fd and kqueue timers.
Code that fails is: let mut fd = AsyncFd::new(T::new()).expect("To create AsyncFd");

It only fails on mac during creation of AsyncFd, not when using it.

The way I use it:

impl<T: TimerFd> Future for AsyncTokioTimer<T> {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context) -> task::Poll<Self::Output> {
        if let State::Init(ref timeout) = &self.state {
            let mut fd = AsyncFd::new(T::new()).expect("To create AsyncFd");
            fd.get_mut().set(*timeout);
            self.state = State::Running(fd, false)
        };

        if let State::Running(ref mut fd, ref mut state) = &mut self.state {
            if *state {
                return task::Poll::Ready(());
            }

            let fd = Pin::new(fd);
            match fd.poll_read_ready(ctx) {
                task::Poll::Pending => return task::Poll::Pending,
                task::Poll::Ready(ready) => {
                    let mut ready = ready.expect("Unable to read async timer's fd");
                    //technically we should read first, but we cannot borrow as mut then
                    ready.clear_ready();

                    match fd.get_mut().get_mut().read() {
                        0 => {
                            *state = false;
                            return task::Poll::Pending
                        },
                        _ => {
                            *state = true;
                            return task::Poll::Ready(())
                        }
                    }
                }
            }
        } else {
            unreach!();
        }
    }
}

It is pretty close to what I had with PollEvented (shorter snippet)

let fd = Pin::new(fd);
match fd.poll_read_ready(ctx, mio::Ready::readable()) {
    task::Poll::Pending => return task::Poll::Pending,
    task::Poll::Ready(ready) => match ready.map(|ready| ready.is_readable()).expect("kqueue cannot be ready") {
        true => {
            let _ = fd.clear_read_ready(ctx, mio::Ready::readable());
            match fd.get_mut().get_mut().read() {
                0 => return task::Poll::Pending,
                _ => return task::Poll::Ready(()),
            }
        }
        false => return task::Poll::Pending,
    },
}

@carllerche
Copy link
Member

How did you create the PollEvented?

@HK416-is-all-you-need
Copy link
Contributor Author

HK416-is-all-you-need commented Oct 29, 2020

It is similar let fd = tokio_02::io::PollEvented::new(RawTimer::new()).expect("To create PollEvented");
Well, now that you asked I remember one thing.
I think I had to remove writable interest in mio::Evented
The problem is that I'm not sure if I can do it with AsyncFd at least with current API
Actually why tokio/mio always assumes writable?

Here full definition of wrapper

struct RawTimer(c_int);

impl RawTimer {
    fn new() -> Self {
        let fd = nix::sys::event::kqueue().unwrap_or(-1);

        //If you hit this, then most likely you run into OS imposed limit on file descriptor number
        os_assert!(fd != -1);
        Self(fd)
    }

    fn set(&self, time: time::Duration) {
        use nix::sys::event::*;

        let flags = EventFlag::EV_ADD | EventFlag::EV_ENABLE | EventFlag::EV_ONESHOT;
        let mut time = time.as_nanos();
        let mut unit = FilterFlag::NOTE_NSECONDS;

        if time > isize::max_value() as u128 {
            unit = FilterFlag::NOTE_USECONDS;
            time /= 1_000;
        }
        if time > isize::max_value() as u128 {
            unit = FilterFlag::empty(); // default is milliseconds
            time /= 1_000;
        }
        if time > isize::max_value() as u128 {
            unit = FilterFlag::NOTE_SECONDS;
            time /= 1_000;
        }

        let time = time as isize;
        kevent(self.0, &[KEvent::new(1, EventFilter::EVFILT_TIMER, flags, unit, time, 0)], &mut [], 0).expect("To arm timer");
    }

    fn unset(&self) {
        use nix::sys::event::*;

        let flags = EventFlag::EV_DELETE;
        kevent(self.0, &[KEvent::new(1, EventFilter::EVFILT_TIMER, flags, FilterFlag::empty(), 0, 0)], &mut [], 0).expect("To disarm timer");
    }

    fn read(&self) -> usize {
        use nix::sys::event::*;

        let mut ev = [KEvent::new(0, EventFilter::EVFILT_TIMER, EventFlag::empty(), FilterFlag::empty(), 0, 0)];

        kevent(self.0, &[], &mut ev[..], 0).expect("To execute kevent")
    }
}

impl mio::Evented for RawTimer {
    fn register(&self, poll: &mio::Poll, token: mio::Token, mut interest: mio::Ready, opts: mio::PollOpt) -> io::Result<()> {
        interest.remove(mio::Ready::writable());
        mio::unix::EventedFd(&self.0).register(poll, token, interest, opts)
    }

    fn reregister(&self, poll: &mio::Poll, token: mio::Token, mut interest: mio::Ready, opts: mio::PollOpt) -> io::Result<()> {
        interest.remove(mio::Ready::writable());
        mio::unix::EventedFd(&self.0).reregister(poll, token, interest, opts)
    }

    fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
        mio::unix::EventedFd(&self.0).deregister(poll)
    }
}

@HK416-is-all-you-need HK416-is-all-you-need changed the title AsyncFd creation fails for kqueue's fd on mac in tokio 0.3 AsyncFd doesn't allow to decide read/write interest making kqueue's fd fail in tokio 0.3 Oct 29, 2020
@carllerche
Copy link
Member

Once #3130 lands, we can add AsyncFd::with_interest(...) as the Interest type will be public.

carllerche added a commit that referenced this issue Nov 13, 2020
Adds function to await for readiness on the TcpStream and non-blocking read/write functions.

`async fn TcpStream::ready(Interest)` waits for socket readiness satisfying **any** of the specified
interest. There are also two shorthand functions, `readable()` and `writable()`.

Once the stream is in a ready state, the caller may perform non-blocking operations on it using
`try_read()` and `try_write()`. These function return `WouldBlock` if the stream is not, in fact, ready.

The await readiness function are similar to `AsyncFd`, but do not require a guard. The guard in
`AsyncFd` protect against a potential race between receiving the readiness notification and clearing
it. The guard is needed as Tokio does not control the operations. With `TcpStream`, the `try_read()`
and `try_write()` function handle clearing stream readiness as needed.

This also exposes `Interest` and `Ready`, both defined in Tokio as wrappers for Mio types. These
types will also be useful for fixing #3072 .

Other I/O types, such as `TcpListener`, `UdpSocket`, `Unix*` should get similar functions, but this
is left for later PRs.

Refs: #3130
carllerche pushed a commit that referenced this issue Nov 30, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-io Module: tokio/io
Projects
None yet
4 participants