Skip to content

Commit

Permalink
kqueue: remove timeout from unix.Kevent() (#480)
Browse files Browse the repository at this point in the history
The timeout for unix.Kevent() is causing issues; every 100ms it will do a new
unix.Kevent() syscall, which isn't too efficient: even if you have just one
change an hour, you will still keep calling kevent() ten times per second,
resulting in a needlessly high CPU usage.

Without a timeout, kevent() will block indefinitely until there are some events,
which is much more efficient. We can't just remove the timout however, since we
can't interrupt the kevent() call on FreeBSD and NetBSD, and it will hang
forever. This issue is described in more detail here:
#262 (comment)

To solve this we register a new kevent() with the file descriptor set to the
closepipe; when this pipe is closed an event is sent and kevent() will stop
blocking.

This is a rebased version of #124.

Fixes #89
Fixes #237
Fixes #333
Supersedes and closes #124
Supersedes and closes #262
Supersedes and closes #334

Co-authored-by: Felix Lange <fjl@twurst.com>
  • Loading branch information
arp242 and fjl committed Aug 6, 2022
1 parent a24f78c commit 4b43fad
Showing 1 changed file with 70 additions and 57 deletions.
127 changes: 70 additions & 57 deletions kqueue.go
Expand Up @@ -14,7 +14,6 @@ import (
"os"
"path/filepath"
"sync"
"time"

"golang.org/x/sys/unix"
)
Expand All @@ -23,9 +22,10 @@ import (
type Watcher struct {
Events chan Event
Errors chan error
done chan struct{} // Channel for sending a "quit message" to the reader goroutine
done chan struct{}

kq int // File descriptor (as returned by the kqueue() syscall).
kq int // File descriptor (as returned by the kqueue() syscall).
closepipe [2]int // Pipe used for closing.

mu sync.Mutex // Protects access to watcher data
watches map[string]int // Watched file descriptors (key: path).
Expand All @@ -44,13 +44,14 @@ type pathInfo struct {

// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
func NewWatcher() (*Watcher, error) {
kq, err := kqueue()
kq, closepipe, err := kqueue()
if err != nil {
return nil, err
}

w := &Watcher{
kq: kq,
closepipe: closepipe,
watches: make(map[string]int),
watchesByDir: make(map[string]map[int]struct{}),
dirFlags: make(map[string]uint32),
Expand Down Expand Up @@ -87,8 +88,8 @@ func (w *Watcher) Close() error {
w.Remove(name)
}

// send a "quit" message to the reader goroutine
close(w.done)
// Send "quit" message to the reader goroutine.
unix.Close(w.closepipe[1])

return nil
}
Expand All @@ -112,8 +113,8 @@ func (w *Watcher) Remove(name string) error {
return fmt.Errorf("%w: %s", ErrNonExistentWatch, name)
}

const registerRemove = unix.EV_DELETE
if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil {
err := register(w.kq, []int{watchfd}, unix.EV_DELETE, 0)
if err != nil {
return err
}

Expand Down Expand Up @@ -172,9 +173,6 @@ func (w *Watcher) WatchList() []string {
// Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE)
const noteAllEvents = unix.NOTE_DELETE | unix.NOTE_WRITE | unix.NOTE_ATTRIB | unix.NOTE_RENAME

// keventWaitTime to block on each read from kevent
var keventWaitTime = durationToTimespec(100 * time.Millisecond)

// addWatch adds name to the watched file set.
// The flags are interpreted as described in kevent(2).
// Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks.
Expand Down Expand Up @@ -255,8 +253,8 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
isDir = fi.IsDir()
}

const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE
if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil {
err := register(w.kq, []int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags)
if err != nil {
unix.Close(watchfd)
return "", err
}
Expand Down Expand Up @@ -301,35 +299,44 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
// Event values that it sends down the Events channel.
func (w *Watcher) readEvents() {
eventBuffer := make([]unix.Kevent_t, 10)

loop:
for {
// See if there is a message on the "done" channel
select {
case <-w.done:
break loop
default:
defer func() {
err := unix.Close(w.kq)
if err != nil {
w.Errors <- err
}

// Get new events
kevents, err := read(w.kq, eventBuffer, &keventWaitTime)
unix.Close(w.closepipe[0])
close(w.done)
close(w.Events)
close(w.Errors)
}()

for closed := false; !closed; {
kevents, err := read(w.kq, eventBuffer)
// EINTR is okay, the syscall was interrupted before timeout expired.
if err != nil && err != unix.EINTR {
select {
case w.Errors <- err:
case <-w.done:
break loop
closed = true
continue
}
continue
}

// Flush the events we received to the Events channel
for len(kevents) > 0 {
for _, kevent := range kevents {
var (
kevent = &kevents[0]
watchfd = int(kevent.Ident)
mask = uint32(kevent.Fflags)
)

// Shut down the loop when the pipe is closed, but only after all
// other events have been processed.
if watchfd == w.closepipe[0] {
closed = true
continue
}

w.mu.Lock()
path := w.paths[watchfd]
w.mu.Unlock()
Expand Down Expand Up @@ -360,7 +367,8 @@ loop:
select {
case w.Events <- event:
case <-w.done:
break loop
closed = true
continue
}
}

Expand Down Expand Up @@ -388,23 +396,8 @@ loop:
}
}
}

// Move to next event
kevents = kevents[1:]
}
}

// cleanup
err := unix.Close(w.kq)
if err != nil {
// only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors.
select {
case w.Errors <- err:
default:
}
}
close(w.Events)
close(w.Errors)
}

// newEvent returns an platform-independent Event based on kqueue Fflags.
Expand Down Expand Up @@ -517,25 +510,50 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro
}

// kqueue creates a new kernel event queue and returns a descriptor.
func kqueue() (kq int, err error) {
//
// This registers a new event on closepipe, which will trigger an event when
// it's closed. This way we can use kevent() without timeout/polling; without
// the closepipe, it would block forever and we wouldn't be able to stop it at
// all.
func kqueue() (kq int, closepipe [2]int, err error) {
kq, err = unix.Kqueue()
if kq == -1 {
return kq, err
return kq, closepipe, err
}

// Register the close pipe.
err = unix.Pipe(closepipe[:])
if err != nil {
unix.Close(kq)
return kq, closepipe, err
}

// Register changes to listen on the closepipe.
changes := make([]unix.Kevent_t, 1)
// SetKevent converts int to the platform-specific types.
unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ,
unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT)

ok, err := unix.Kevent(kq, changes, nil, nil)
if ok == -1 {
unix.Close(kq)
unix.Close(closepipe[0])
unix.Close(closepipe[1])
return kq, closepipe, err
}
return kq, nil
return kq, closepipe, nil
}

// register events with the queue
// Register events with the queue.
func register(kq int, fds []int, flags int, fflags uint32) error {
changes := make([]unix.Kevent_t, len(fds))

for i, fd := range fds {
// SetKevent converts int to the platform-specific types:
// SetKevent converts int to the platform-specific types.
unix.SetKevent(&changes[i], fd, unix.EVFILT_VNODE, flags)
changes[i].Fflags = fflags
}

// register the events
// Register the events.
success, err := unix.Kevent(kq, changes, nil, nil)
if success == -1 {
return err
Expand All @@ -545,15 +563,10 @@ func register(kq int, fds []int, flags int, fflags uint32) error {

// read retrieves pending events, or waits until an event occurs.
// A timeout of nil blocks indefinitely, while 0 polls the queue.
func read(kq int, events []unix.Kevent_t, timeout *unix.Timespec) ([]unix.Kevent_t, error) {
n, err := unix.Kevent(kq, nil, events, timeout)
func read(kq int, events []unix.Kevent_t) ([]unix.Kevent_t, error) {
n, err := unix.Kevent(kq, nil, events, nil)
if err != nil {
return nil, err
}
return events[0:n], nil
}

// durationToTimespec prepares a timeout value
func durationToTimespec(d time.Duration) unix.Timespec {
return unix.NsecToTimespec(d.Nanoseconds())
}

0 comments on commit 4b43fad

Please sign in to comment.