From 0960926bcfc482187428ecdd2b276ec98a186f4a Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 4 Mar 2016 16:34:54 +0100 Subject: [PATCH 1/3] kqueue: wait without timeout As noted in #24, reading from the kqueue won't unblock when it is closed on all platforms. This commit solves the issue by additionally watching on a pipe which is closed when Watcher is closed. The read timeout is no longer necessary. --- kqueue.go | 83 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/kqueue.go b/kqueue.go index c2b4acb1..cfa61610 100644 --- a/kqueue.go +++ b/kqueue.go @@ -13,7 +13,6 @@ import ( "os" "path/filepath" "sync" - "time" "golang.org/x/sys/unix" ) @@ -22,9 +21,9 @@ import ( type Watcher struct { Events chan Event Errors chan error - done chan bool // Channel for sending a "quit message" to the reader goroutine - kq int // File descriptor (as returned by the kqueue() syscall). + kq int // File descriptor (as returned by the kqueue() syscall). + closepipe [2]int // Pipe used for closing. mu sync.Mutex // Protects access to watcher data watches map[string]int // Map of watched file descriptors (key: path). @@ -42,13 +41,14 @@ type pathInfo struct { // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. func NewWatcher() (*Watcher, error) { - kq, err := kqueue() + kq, closepipe, err := kqueue() if err != nil { return nil, err } w := &Watcher{ kq: kq, + closepipe: closepipe, watches: make(map[string]int), dirFlags: make(map[string]uint32), paths: make(map[int]pathInfo), @@ -56,7 +56,6 @@ func NewWatcher() (*Watcher, error) { externalWatches: make(map[string]bool), Events: make(chan Event), Errors: make(chan error), - done: make(chan bool), } go w.readEvents() @@ -89,8 +88,8 @@ func (w *Watcher) Close() error { } } - // Send "quit" message to the reader goroutine: - w.done <- true + // Send "quit" message to the reader goroutine. + unix.Close(w.closepipe[1]) return nil } @@ -155,9 +154,6 @@ func (w *Watcher) Remove(name string) error { // Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE) const noteAllEvents = unix.NOTE_DELETE | unix.NOTE_WRITE | unix.NOTE_ATTRIB | unix.NOTE_RENAME -// keventWaitTime to block on each read from kevent -var keventWaitTime = durationToTimespec(100 * time.Millisecond) - // addWatch adds name to the watched file set. // The flags are interpreted as described in kevent(2). // Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks. @@ -265,23 +261,18 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) { // Event values that it sends down the Events channel. func (w *Watcher) readEvents() { eventBuffer := make([]unix.Kevent_t, 10) - - for { - // See if there is a message on the "done" channel - select { - case <-w.done: - err := unix.Close(w.kq) - if err != nil { - w.Errors <- err - } - close(w.Events) - close(w.Errors) - return - default: + defer func() { + err := unix.Close(w.kq) + if err != nil { + w.Errors <- err } + unix.Close(w.closepipe[0]) + close(w.Events) + close(w.Errors) + }() - // Get new events - kevents, err := read(w.kq, eventBuffer, &keventWaitTime) + for closed := false; !closed; { + kevents, err := read(w.kq, eventBuffer) // EINTR is okay, the syscall was interrupted before timeout expired. if err != nil && err != unix.EINTR { w.Errors <- err @@ -289,10 +280,16 @@ func (w *Watcher) readEvents() { } // Flush the events we received to the Events channel - for len(kevents) > 0 { - kevent := &kevents[0] + for _, kevent := range kevents { watchfd := int(kevent.Ident) mask := uint32(kevent.Fflags) + // Shut down the loop when the pipe is closed, + // but only after all other events have been processed. + if watchfd == w.closepipe[0] { + closed = true + continue + } + w.mu.Lock() path := w.paths[watchfd] w.mu.Unlock() @@ -347,9 +344,6 @@ func (w *Watcher) readEvents() { } } } - - // Move to next event - kevents = kevents[1:] } } } @@ -461,12 +455,26 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro } // kqueue creates a new kernel event queue and returns a descriptor. -func kqueue() (kq int, err error) { +func kqueue() (kq int, closepipe [2]int, err error) { kq, err = unix.Kqueue() if kq == -1 { - return kq, err + return kq, closepipe, err + } + // Register the close pipe. + if err := unix.Pipe(closepipe[:]); err != nil { + unix.Close(kq) + return kq, closepipe, err } - return kq, nil + events := make([]unix.Kevent_t, 1) + const pflag = unix.EV_ADD | unix.EV_ENABLE | unix.EV_ONESHOT + unix.SetKevent(&events[0], closepipe[0], unix.EVFILT_READ, pflag) + if ok, err := unix.Kevent(kq, events, nil, nil); ok == -1 { + unix.Close(kq) + unix.Close(closepipe[0]) + unix.Close(closepipe[1]) + return kq, closepipe, err + } + return kq, closepipe, nil } // register events with the queue @@ -489,15 +497,10 @@ func register(kq int, fds []int, flags int, fflags uint32) error { // read retrieves pending events, or waits until an event occurs. // A timeout of nil blocks indefinitely, while 0 polls the queue. -func read(kq int, events []unix.Kevent_t, timeout *unix.Timespec) ([]unix.Kevent_t, error) { - n, err := unix.Kevent(kq, nil, events, timeout) +func read(kq int, events []unix.Kevent_t) ([]unix.Kevent_t, error) { + n, err := unix.Kevent(kq, nil, events, nil) if err != nil { return nil, err } return events[0:n], nil } - -// durationToTimespec prepares a timeout value -func durationToTimespec(d time.Duration) unix.Timespec { - return unix.NsecToTimespec(d.Nanoseconds()) -} From adf163de63d142c40c3e51c512abfc4fa342a1bb Mon Sep 17 00:00:00 2001 From: Martin Tournoij Date: Mon, 1 Aug 2022 22:35:03 +0200 Subject: [PATCH 2/3] We need the done chan for errors --- kqueue.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kqueue.go b/kqueue.go index b6b98df6..789fbe1f 100644 --- a/kqueue.go +++ b/kqueue.go @@ -22,6 +22,7 @@ import ( type Watcher struct { Events chan Event Errors chan error + done chan bool kq int // File descriptor (as returned by the kqueue() syscall). closepipe [2]int // Pipe used for closing. @@ -87,6 +88,7 @@ func (w *Watcher) Close() error { } // Send "quit" message to the reader goroutine. + w.done <- true unix.Close(w.closepipe[1]) return nil @@ -314,7 +316,8 @@ func (w *Watcher) readEvents() { select { case w.Errors <- err: case <-w.done: - break loop + closed = true + continue } continue } @@ -360,7 +363,8 @@ func (w *Watcher) readEvents() { select { case w.Events <- event: case <-w.done: - break loop + closed = true + continue } } From fe3d62d1c668ae965ee725d8f9699a0b3bac4db4 Mon Sep 17 00:00:00 2001 From: Martin Tournoij Date: Mon, 1 Aug 2022 22:39:09 +0200 Subject: [PATCH 3/3] Fix up this PR after merge The timeout for unix.Kevent() is causing issues; every 100ms it will do a new unix.Kevent() syscall, which isn't too efficient: even if you have just one change an hour, you will still keep calling kevent() ten times per second, resulting in a needlessly high CPU usage. Without a timeout, kevent() will block indefinitely until there are some events, which is much more efficient. We can't just remove the timout however, since we can't interrupt the kevent() call on FreeBSD and NetBSD, and it will hang forever. This issue is described in more detail here: https://github.com/fsnotify/fsnotify/pull/262#issuecomment-1201655890 To solve this we register a new kevent() with the file descriptor set to the closepipe; when this pipe is closed an event is sent and kevent() will stop blocking. This is a rebased version of #124. Fixes #89 Fixes #237 Fixes #333 Supersedes and closes #124 Supersedes and closes #262 Supersedes and closes #334 --- kqueue.go | 58 +++++++++++++++++++++++++++---------------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/kqueue.go b/kqueue.go index 789fbe1f..756e51e7 100644 --- a/kqueue.go +++ b/kqueue.go @@ -22,7 +22,7 @@ import ( type Watcher struct { Events chan Event Errors chan error - done chan bool + done chan struct{} kq int // File descriptor (as returned by the kqueue() syscall). closepipe [2]int // Pipe used for closing. @@ -60,6 +60,7 @@ func NewWatcher() (*Watcher, error) { externalWatches: make(map[string]bool), Events: make(chan Event), Errors: make(chan error), + done: make(chan struct{}), } go w.readEvents() @@ -88,7 +89,6 @@ func (w *Watcher) Close() error { } // Send "quit" message to the reader goroutine. - w.done <- true unix.Close(w.closepipe[1]) return nil @@ -113,8 +113,8 @@ func (w *Watcher) Remove(name string) error { return fmt.Errorf("%w: %s", ErrNonExistentWatch, name) } - const registerRemove = unix.EV_DELETE - if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil { + err := register(w.kq, []int{watchfd}, unix.EV_DELETE, 0) + if err != nil { return err } @@ -253,8 +253,8 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) { isDir = fi.IsDir() } - const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE - if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil { + err := register(w.kq, []int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags) + if err != nil { unix.Close(watchfd) return "", err } @@ -305,6 +305,7 @@ func (w *Watcher) readEvents() { w.Errors <- err } unix.Close(w.closepipe[0]) + close(w.done) close(w.Events) close(w.Errors) }() @@ -326,8 +327,8 @@ func (w *Watcher) readEvents() { for _, kevent := range kevents { watchfd := int(kevent.Ident) mask := uint32(kevent.Fflags) - // Shut down the loop when the pipe is closed, - // but only after all other events have been processed. + // Shut down the loop when the pipe is closed, but only after all + // other events have been processed. if watchfd == w.closepipe[0] { closed = true continue @@ -394,18 +395,6 @@ func (w *Watcher) readEvents() { } } } - - // cleanup - err := unix.Close(w.kq) - if err != nil { - // only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors. - select { - case w.Errors <- err: - default: - } - } - close(w.Events) - close(w.Errors) } // newEvent returns an platform-independent Event based on kqueue Fflags. @@ -522,20 +511,32 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro } // kqueue creates a new kernel event queue and returns a descriptor. +// +// This registers a new event on closepipe, which will trigger an event when +// it's closed. This way we can use kevent() without timeout/polling; without +// the closepipe, it would block forever and we wouldn't be able to stop it at +// all. func kqueue() (kq int, closepipe [2]int, err error) { kq, err = unix.Kqueue() if kq == -1 { return kq, closepipe, err } + // Register the close pipe. - if err := unix.Pipe(closepipe[:]); err != nil { + err = unix.Pipe(closepipe[:]) + if err != nil { unix.Close(kq) return kq, closepipe, err } - events := make([]unix.Kevent_t, 1) - const pflag = unix.EV_ADD | unix.EV_ENABLE | unix.EV_ONESHOT - unix.SetKevent(&events[0], closepipe[0], unix.EVFILT_READ, pflag) - if ok, err := unix.Kevent(kq, events, nil, nil); ok == -1 { + + // Register changes to listen on the closepipe. + changes := make([]unix.Kevent_t, 1) + // SetKevent converts int to the platform-specific types. + unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ, + unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT) + + ok, err := unix.Kevent(kq, changes, nil, nil) + if ok == -1 { unix.Close(kq) unix.Close(closepipe[0]) unix.Close(closepipe[1]) @@ -544,17 +545,16 @@ func kqueue() (kq int, closepipe [2]int, err error) { return kq, closepipe, nil } -// register events with the queue +// Register events with the queue. func register(kq int, fds []int, flags int, fflags uint32) error { changes := make([]unix.Kevent_t, len(fds)) - for i, fd := range fds { - // SetKevent converts int to the platform-specific types: + // SetKevent converts int to the platform-specific types. unix.SetKevent(&changes[i], fd, unix.EVFILT_VNODE, flags) changes[i].Fflags = fflags } - // register the events + // Register the events. success, err := unix.Kevent(kq, changes, nil, nil) if success == -1 { return err