Skip to content

Commit

Permalink
Fix up this PR after merge
Browse files Browse the repository at this point in the history
The timeout for unix.Kevent() is causing issues; every 100ms it will do a new
unix.Kevent() syscall, which isn't too efficient: even if you have just one
change an hour, you will still keep calling kevent() ten times per second,
resulting in a needlessly high CPU usage.

Without a timeout, kevent() will block indefinitely until there are some events,
which is much more efficient. We can't just remove the timout however, since we
can't interrupt the kevent() call on FreeBSD and NetBSD, and it will hang
forever. This issue is described in more detail here:
#262 (comment)

To solve this we register a new kevent() with the file descriptor set to the
closepipe; when this pipe is closed an event is sent and kevent() will stop
blocking.

This is a rebased version of #124.

Fixes #89
Fixes #237
Fixes #333
Supersedes and closes #124
Supersedes and closes #262
Supersedes and closes #334
  • Loading branch information
arp242 committed Aug 1, 2022
1 parent adf163d commit fe3d62d
Showing 1 changed file with 29 additions and 29 deletions.
58 changes: 29 additions & 29 deletions kqueue.go
Expand Up @@ -22,7 +22,7 @@ import (
type Watcher struct {
Events chan Event
Errors chan error
done chan bool
done chan struct{}

kq int // File descriptor (as returned by the kqueue() syscall).
closepipe [2]int // Pipe used for closing.
Expand Down Expand Up @@ -60,6 +60,7 @@ func NewWatcher() (*Watcher, error) {
externalWatches: make(map[string]bool),
Events: make(chan Event),
Errors: make(chan error),
done: make(chan struct{}),
}

go w.readEvents()
Expand Down Expand Up @@ -88,7 +89,6 @@ func (w *Watcher) Close() error {
}

// Send "quit" message to the reader goroutine.
w.done <- true
unix.Close(w.closepipe[1])

return nil
Expand All @@ -113,8 +113,8 @@ func (w *Watcher) Remove(name string) error {
return fmt.Errorf("%w: %s", ErrNonExistentWatch, name)
}

const registerRemove = unix.EV_DELETE
if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil {
err := register(w.kq, []int{watchfd}, unix.EV_DELETE, 0)
if err != nil {
return err
}

Expand Down Expand Up @@ -253,8 +253,8 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
isDir = fi.IsDir()
}

const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE
if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil {
err := register(w.kq, []int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags)
if err != nil {
unix.Close(watchfd)
return "", err
}
Expand Down Expand Up @@ -305,6 +305,7 @@ func (w *Watcher) readEvents() {
w.Errors <- err
}
unix.Close(w.closepipe[0])
close(w.done)
close(w.Events)
close(w.Errors)
}()
Expand All @@ -326,8 +327,8 @@ func (w *Watcher) readEvents() {
for _, kevent := range kevents {
watchfd := int(kevent.Ident)
mask := uint32(kevent.Fflags)
// Shut down the loop when the pipe is closed,
// but only after all other events have been processed.
// Shut down the loop when the pipe is closed, but only after all
// other events have been processed.
if watchfd == w.closepipe[0] {
closed = true
continue
Expand Down Expand Up @@ -394,18 +395,6 @@ func (w *Watcher) readEvents() {
}
}
}

// cleanup
err := unix.Close(w.kq)
if err != nil {
// only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors.
select {
case w.Errors <- err:
default:
}
}
close(w.Events)
close(w.Errors)
}

// newEvent returns an platform-independent Event based on kqueue Fflags.
Expand Down Expand Up @@ -522,20 +511,32 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro
}

// kqueue creates a new kernel event queue and returns a descriptor.
//
// This registers a new event on closepipe, which will trigger an event when
// it's closed. This way we can use kevent() without timeout/polling; without
// the closepipe, it would block forever and we wouldn't be able to stop it at
// all.
func kqueue() (kq int, closepipe [2]int, err error) {
kq, err = unix.Kqueue()
if kq == -1 {
return kq, closepipe, err
}

// Register the close pipe.
if err := unix.Pipe(closepipe[:]); err != nil {
err = unix.Pipe(closepipe[:])
if err != nil {
unix.Close(kq)
return kq, closepipe, err
}
events := make([]unix.Kevent_t, 1)
const pflag = unix.EV_ADD | unix.EV_ENABLE | unix.EV_ONESHOT
unix.SetKevent(&events[0], closepipe[0], unix.EVFILT_READ, pflag)
if ok, err := unix.Kevent(kq, events, nil, nil); ok == -1 {

// Register changes to listen on the closepipe.
changes := make([]unix.Kevent_t, 1)
// SetKevent converts int to the platform-specific types.
unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ,
unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT)

ok, err := unix.Kevent(kq, changes, nil, nil)
if ok == -1 {
unix.Close(kq)
unix.Close(closepipe[0])
unix.Close(closepipe[1])
Expand All @@ -544,17 +545,16 @@ func kqueue() (kq int, closepipe [2]int, err error) {
return kq, closepipe, nil
}

// register events with the queue
// Register events with the queue.
func register(kq int, fds []int, flags int, fflags uint32) error {
changes := make([]unix.Kevent_t, len(fds))

for i, fd := range fds {
// SetKevent converts int to the platform-specific types:
// SetKevent converts int to the platform-specific types.
unix.SetKevent(&changes[i], fd, unix.EVFILT_VNODE, flags)
changes[i].Fflags = fflags
}

// register the events
// Register the events.
success, err := unix.Kevent(kq, changes, nil, nil)
if success == -1 {
return err
Expand Down

0 comments on commit fe3d62d

Please sign in to comment.