From 4b43fadabe139cc8987266635f8e5fe45feea710 Mon Sep 17 00:00:00 2001 From: Martin Tournoij Date: Sat, 6 Aug 2022 18:17:18 +0200 Subject: [PATCH] kqueue: remove timeout from unix.Kevent() (#480) The timeout for unix.Kevent() is causing issues; every 100ms it will do a new unix.Kevent() syscall, which isn't too efficient: even if you have just one change an hour, you will still keep calling kevent() ten times per second, resulting in a needlessly high CPU usage. Without a timeout, kevent() will block indefinitely until there are some events, which is much more efficient. We can't just remove the timout however, since we can't interrupt the kevent() call on FreeBSD and NetBSD, and it will hang forever. This issue is described in more detail here: https://github.com/fsnotify/fsnotify/pull/262#issuecomment-1201655890 To solve this we register a new kevent() with the file descriptor set to the closepipe; when this pipe is closed an event is sent and kevent() will stop blocking. This is a rebased version of #124. Fixes #89 Fixes #237 Fixes #333 Supersedes and closes #124 Supersedes and closes #262 Supersedes and closes #334 Co-authored-by: Felix Lange --- kqueue.go | 127 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 70 insertions(+), 57 deletions(-) diff --git a/kqueue.go b/kqueue.go index f188fd86..c63e7b61 100644 --- a/kqueue.go +++ b/kqueue.go @@ -14,7 +14,6 @@ import ( "os" "path/filepath" "sync" - "time" "golang.org/x/sys/unix" ) @@ -23,9 +22,10 @@ import ( type Watcher struct { Events chan Event Errors chan error - done chan struct{} // Channel for sending a "quit message" to the reader goroutine + done chan struct{} - kq int // File descriptor (as returned by the kqueue() syscall). + kq int // File descriptor (as returned by the kqueue() syscall). + closepipe [2]int // Pipe used for closing. mu sync.Mutex // Protects access to watcher data watches map[string]int // Watched file descriptors (key: path). @@ -44,13 +44,14 @@ type pathInfo struct { // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. func NewWatcher() (*Watcher, error) { - kq, err := kqueue() + kq, closepipe, err := kqueue() if err != nil { return nil, err } w := &Watcher{ kq: kq, + closepipe: closepipe, watches: make(map[string]int), watchesByDir: make(map[string]map[int]struct{}), dirFlags: make(map[string]uint32), @@ -87,8 +88,8 @@ func (w *Watcher) Close() error { w.Remove(name) } - // send a "quit" message to the reader goroutine - close(w.done) + // Send "quit" message to the reader goroutine. + unix.Close(w.closepipe[1]) return nil } @@ -112,8 +113,8 @@ func (w *Watcher) Remove(name string) error { return fmt.Errorf("%w: %s", ErrNonExistentWatch, name) } - const registerRemove = unix.EV_DELETE - if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil { + err := register(w.kq, []int{watchfd}, unix.EV_DELETE, 0) + if err != nil { return err } @@ -172,9 +173,6 @@ func (w *Watcher) WatchList() []string { // Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE) const noteAllEvents = unix.NOTE_DELETE | unix.NOTE_WRITE | unix.NOTE_ATTRIB | unix.NOTE_RENAME -// keventWaitTime to block on each read from kevent -var keventWaitTime = durationToTimespec(100 * time.Millisecond) - // addWatch adds name to the watched file set. // The flags are interpreted as described in kevent(2). // Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks. @@ -255,8 +253,8 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) { isDir = fi.IsDir() } - const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE - if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil { + err := register(w.kq, []int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags) + if err != nil { unix.Close(watchfd) return "", err } @@ -301,35 +299,44 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) { // Event values that it sends down the Events channel. func (w *Watcher) readEvents() { eventBuffer := make([]unix.Kevent_t, 10) - -loop: - for { - // See if there is a message on the "done" channel - select { - case <-w.done: - break loop - default: + defer func() { + err := unix.Close(w.kq) + if err != nil { + w.Errors <- err } - - // Get new events - kevents, err := read(w.kq, eventBuffer, &keventWaitTime) + unix.Close(w.closepipe[0]) + close(w.done) + close(w.Events) + close(w.Errors) + }() + + for closed := false; !closed; { + kevents, err := read(w.kq, eventBuffer) // EINTR is okay, the syscall was interrupted before timeout expired. if err != nil && err != unix.EINTR { select { case w.Errors <- err: case <-w.done: - break loop + closed = true + continue } continue } // Flush the events we received to the Events channel - for len(kevents) > 0 { + for _, kevent := range kevents { var ( - kevent = &kevents[0] watchfd = int(kevent.Ident) mask = uint32(kevent.Fflags) ) + + // Shut down the loop when the pipe is closed, but only after all + // other events have been processed. + if watchfd == w.closepipe[0] { + closed = true + continue + } + w.mu.Lock() path := w.paths[watchfd] w.mu.Unlock() @@ -360,7 +367,8 @@ loop: select { case w.Events <- event: case <-w.done: - break loop + closed = true + continue } } @@ -388,23 +396,8 @@ loop: } } } - - // Move to next event - kevents = kevents[1:] } } - - // cleanup - err := unix.Close(w.kq) - if err != nil { - // only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors. - select { - case w.Errors <- err: - default: - } - } - close(w.Events) - close(w.Errors) } // newEvent returns an platform-independent Event based on kqueue Fflags. @@ -517,25 +510,50 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro } // kqueue creates a new kernel event queue and returns a descriptor. -func kqueue() (kq int, err error) { +// +// This registers a new event on closepipe, which will trigger an event when +// it's closed. This way we can use kevent() without timeout/polling; without +// the closepipe, it would block forever and we wouldn't be able to stop it at +// all. +func kqueue() (kq int, closepipe [2]int, err error) { kq, err = unix.Kqueue() if kq == -1 { - return kq, err + return kq, closepipe, err + } + + // Register the close pipe. + err = unix.Pipe(closepipe[:]) + if err != nil { + unix.Close(kq) + return kq, closepipe, err + } + + // Register changes to listen on the closepipe. + changes := make([]unix.Kevent_t, 1) + // SetKevent converts int to the platform-specific types. + unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ, + unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT) + + ok, err := unix.Kevent(kq, changes, nil, nil) + if ok == -1 { + unix.Close(kq) + unix.Close(closepipe[0]) + unix.Close(closepipe[1]) + return kq, closepipe, err } - return kq, nil + return kq, closepipe, nil } -// register events with the queue +// Register events with the queue. func register(kq int, fds []int, flags int, fflags uint32) error { changes := make([]unix.Kevent_t, len(fds)) - for i, fd := range fds { - // SetKevent converts int to the platform-specific types: + // SetKevent converts int to the platform-specific types. unix.SetKevent(&changes[i], fd, unix.EVFILT_VNODE, flags) changes[i].Fflags = fflags } - // register the events + // Register the events. success, err := unix.Kevent(kq, changes, nil, nil) if success == -1 { return err @@ -545,15 +563,10 @@ func register(kq int, fds []int, flags int, fflags uint32) error { // read retrieves pending events, or waits until an event occurs. // A timeout of nil blocks indefinitely, while 0 polls the queue. -func read(kq int, events []unix.Kevent_t, timeout *unix.Timespec) ([]unix.Kevent_t, error) { - n, err := unix.Kevent(kq, nil, events, timeout) +func read(kq int, events []unix.Kevent_t) ([]unix.Kevent_t, error) { + n, err := unix.Kevent(kq, nil, events, nil) if err != nil { return nil, err } return events[0:n], nil } - -// durationToTimespec prepares a timeout value -func durationToTimespec(d time.Duration) unix.Timespec { - return unix.NsecToTimespec(d.Nanoseconds()) -}