diff --git a/kqueue.go b/kqueue.go index 789fbe1f..756e51e7 100644 --- a/kqueue.go +++ b/kqueue.go @@ -22,7 +22,7 @@ import ( type Watcher struct { Events chan Event Errors chan error - done chan bool + done chan struct{} kq int // File descriptor (as returned by the kqueue() syscall). closepipe [2]int // Pipe used for closing. @@ -60,6 +60,7 @@ func NewWatcher() (*Watcher, error) { externalWatches: make(map[string]bool), Events: make(chan Event), Errors: make(chan error), + done: make(chan struct{}), } go w.readEvents() @@ -88,7 +89,6 @@ func (w *Watcher) Close() error { } // Send "quit" message to the reader goroutine. - w.done <- true unix.Close(w.closepipe[1]) return nil @@ -113,8 +113,8 @@ func (w *Watcher) Remove(name string) error { return fmt.Errorf("%w: %s", ErrNonExistentWatch, name) } - const registerRemove = unix.EV_DELETE - if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil { + err := register(w.kq, []int{watchfd}, unix.EV_DELETE, 0) + if err != nil { return err } @@ -253,8 +253,8 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) { isDir = fi.IsDir() } - const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE - if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil { + err := register(w.kq, []int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags) + if err != nil { unix.Close(watchfd) return "", err } @@ -305,6 +305,7 @@ func (w *Watcher) readEvents() { w.Errors <- err } unix.Close(w.closepipe[0]) + close(w.done) close(w.Events) close(w.Errors) }() @@ -326,8 +327,8 @@ func (w *Watcher) readEvents() { for _, kevent := range kevents { watchfd := int(kevent.Ident) mask := uint32(kevent.Fflags) - // Shut down the loop when the pipe is closed, - // but only after all other events have been processed. + // Shut down the loop when the pipe is closed, but only after all + // other events have been processed. if watchfd == w.closepipe[0] { closed = true continue @@ -394,18 +395,6 @@ func (w *Watcher) readEvents() { } } } - - // cleanup - err := unix.Close(w.kq) - if err != nil { - // only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors. - select { - case w.Errors <- err: - default: - } - } - close(w.Events) - close(w.Errors) } // newEvent returns an platform-independent Event based on kqueue Fflags. @@ -522,20 +511,32 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro } // kqueue creates a new kernel event queue and returns a descriptor. +// +// This registers a new event on closepipe, which will trigger an event when +// it's closed. This way we can use kevent() without timeout/polling; without +// the closepipe, it would block forever and we wouldn't be able to stop it at +// all. func kqueue() (kq int, closepipe [2]int, err error) { kq, err = unix.Kqueue() if kq == -1 { return kq, closepipe, err } + // Register the close pipe. - if err := unix.Pipe(closepipe[:]); err != nil { + err = unix.Pipe(closepipe[:]) + if err != nil { unix.Close(kq) return kq, closepipe, err } - events := make([]unix.Kevent_t, 1) - const pflag = unix.EV_ADD | unix.EV_ENABLE | unix.EV_ONESHOT - unix.SetKevent(&events[0], closepipe[0], unix.EVFILT_READ, pflag) - if ok, err := unix.Kevent(kq, events, nil, nil); ok == -1 { + + // Register changes to listen on the closepipe. + changes := make([]unix.Kevent_t, 1) + // SetKevent converts int to the platform-specific types. + unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ, + unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT) + + ok, err := unix.Kevent(kq, changes, nil, nil) + if ok == -1 { unix.Close(kq) unix.Close(closepipe[0]) unix.Close(closepipe[1]) @@ -544,17 +545,16 @@ func kqueue() (kq int, closepipe [2]int, err error) { return kq, closepipe, nil } -// register events with the queue +// Register events with the queue. func register(kq int, fds []int, flags int, fflags uint32) error { changes := make([]unix.Kevent_t, len(fds)) - for i, fd := range fds { - // SetKevent converts int to the platform-specific types: + // SetKevent converts int to the platform-specific types. unix.SetKevent(&changes[i], fd, unix.EVFILT_VNODE, flags) changes[i].Fflags = fflags } - // register the events + // Register the events. success, err := unix.Kevent(kq, changes, nil, nil) if success == -1 { return err