diff --git a/kqueue.go b/kqueue.go index f188fd86..c63e7b61 100644 --- a/kqueue.go +++ b/kqueue.go @@ -14,7 +14,6 @@ import ( "os" "path/filepath" "sync" - "time" "golang.org/x/sys/unix" ) @@ -23,9 +22,10 @@ import ( type Watcher struct { Events chan Event Errors chan error - done chan struct{} // Channel for sending a "quit message" to the reader goroutine + done chan struct{} - kq int // File descriptor (as returned by the kqueue() syscall). + kq int // File descriptor (as returned by the kqueue() syscall). + closepipe [2]int // Pipe used for closing. mu sync.Mutex // Protects access to watcher data watches map[string]int // Watched file descriptors (key: path). @@ -44,13 +44,14 @@ type pathInfo struct { // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. func NewWatcher() (*Watcher, error) { - kq, err := kqueue() + kq, closepipe, err := kqueue() if err != nil { return nil, err } w := &Watcher{ kq: kq, + closepipe: closepipe, watches: make(map[string]int), watchesByDir: make(map[string]map[int]struct{}), dirFlags: make(map[string]uint32), @@ -87,8 +88,8 @@ func (w *Watcher) Close() error { w.Remove(name) } - // send a "quit" message to the reader goroutine - close(w.done) + // Send "quit" message to the reader goroutine. + unix.Close(w.closepipe[1]) return nil } @@ -112,8 +113,8 @@ func (w *Watcher) Remove(name string) error { return fmt.Errorf("%w: %s", ErrNonExistentWatch, name) } - const registerRemove = unix.EV_DELETE - if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil { + err := register(w.kq, []int{watchfd}, unix.EV_DELETE, 0) + if err != nil { return err } @@ -172,9 +173,6 @@ func (w *Watcher) WatchList() []string { // Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE) const noteAllEvents = unix.NOTE_DELETE | unix.NOTE_WRITE | unix.NOTE_ATTRIB | unix.NOTE_RENAME -// keventWaitTime to block on each read from kevent -var keventWaitTime = durationToTimespec(100 * time.Millisecond) - // addWatch adds name to the watched file set. // The flags are interpreted as described in kevent(2). // Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks. @@ -255,8 +253,8 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) { isDir = fi.IsDir() } - const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE - if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil { + err := register(w.kq, []int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags) + if err != nil { unix.Close(watchfd) return "", err } @@ -301,35 +299,44 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) { // Event values that it sends down the Events channel. func (w *Watcher) readEvents() { eventBuffer := make([]unix.Kevent_t, 10) - -loop: - for { - // See if there is a message on the "done" channel - select { - case <-w.done: - break loop - default: + defer func() { + err := unix.Close(w.kq) + if err != nil { + w.Errors <- err } - - // Get new events - kevents, err := read(w.kq, eventBuffer, &keventWaitTime) + unix.Close(w.closepipe[0]) + close(w.done) + close(w.Events) + close(w.Errors) + }() + + for closed := false; !closed; { + kevents, err := read(w.kq, eventBuffer) // EINTR is okay, the syscall was interrupted before timeout expired. if err != nil && err != unix.EINTR { select { case w.Errors <- err: case <-w.done: - break loop + closed = true + continue } continue } // Flush the events we received to the Events channel - for len(kevents) > 0 { + for _, kevent := range kevents { var ( - kevent = &kevents[0] watchfd = int(kevent.Ident) mask = uint32(kevent.Fflags) ) + + // Shut down the loop when the pipe is closed, but only after all + // other events have been processed. + if watchfd == w.closepipe[0] { + closed = true + continue + } + w.mu.Lock() path := w.paths[watchfd] w.mu.Unlock() @@ -360,7 +367,8 @@ loop: select { case w.Events <- event: case <-w.done: - break loop + closed = true + continue } } @@ -388,23 +396,8 @@ loop: } } } - - // Move to next event - kevents = kevents[1:] } } - - // cleanup - err := unix.Close(w.kq) - if err != nil { - // only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors. - select { - case w.Errors <- err: - default: - } - } - close(w.Events) - close(w.Errors) } // newEvent returns an platform-independent Event based on kqueue Fflags. @@ -517,25 +510,50 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro } // kqueue creates a new kernel event queue and returns a descriptor. -func kqueue() (kq int, err error) { +// +// This registers a new event on closepipe, which will trigger an event when +// it's closed. This way we can use kevent() without timeout/polling; without +// the closepipe, it would block forever and we wouldn't be able to stop it at +// all. +func kqueue() (kq int, closepipe [2]int, err error) { kq, err = unix.Kqueue() if kq == -1 { - return kq, err + return kq, closepipe, err + } + + // Register the close pipe. + err = unix.Pipe(closepipe[:]) + if err != nil { + unix.Close(kq) + return kq, closepipe, err + } + + // Register changes to listen on the closepipe. + changes := make([]unix.Kevent_t, 1) + // SetKevent converts int to the platform-specific types. + unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ, + unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT) + + ok, err := unix.Kevent(kq, changes, nil, nil) + if ok == -1 { + unix.Close(kq) + unix.Close(closepipe[0]) + unix.Close(closepipe[1]) + return kq, closepipe, err } - return kq, nil + return kq, closepipe, nil } -// register events with the queue +// Register events with the queue. func register(kq int, fds []int, flags int, fflags uint32) error { changes := make([]unix.Kevent_t, len(fds)) - for i, fd := range fds { - // SetKevent converts int to the platform-specific types: + // SetKevent converts int to the platform-specific types. unix.SetKevent(&changes[i], fd, unix.EVFILT_VNODE, flags) changes[i].Fflags = fflags } - // register the events + // Register the events. success, err := unix.Kevent(kq, changes, nil, nil) if success == -1 { return err @@ -545,15 +563,10 @@ func register(kq int, fds []int, flags int, fflags uint32) error { // read retrieves pending events, or waits until an event occurs. // A timeout of nil blocks indefinitely, while 0 polls the queue. -func read(kq int, events []unix.Kevent_t, timeout *unix.Timespec) ([]unix.Kevent_t, error) { - n, err := unix.Kevent(kq, nil, events, timeout) +func read(kq int, events []unix.Kevent_t) ([]unix.Kevent_t, error) { + n, err := unix.Kevent(kq, nil, events, nil) if err != nil { return nil, err } return events[0:n], nil } - -// durationToTimespec prepares a timeout value -func durationToTimespec(d time.Duration) unix.Timespec { - return unix.NsecToTimespec(d.Nanoseconds()) -}