Skip to content

Commit

Permalink
kqueue.go: Remove timeout from reading kevents.
Browse files Browse the repository at this point in the history
Instead of having a timeout that would trigger the readEvents to check it's
done channel, run a seaprate go-routine that watches w.done, and directly
closes the KQueue FD.

This change allows a go-process to fully sleep and go into CPU idle mode,
instead of waking up every 100ms on macOS.
  • Loading branch information
paulquerna-okta authored and arp242 committed Jul 30, 2022
1 parent 57e6a49 commit 7206899
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions kqueue.go
Expand Up @@ -25,7 +25,7 @@ type Watcher struct {
Errors chan error
done chan struct{} // Channel for sending a "quit message" to the reader goroutine

kq int // File descriptor (as returned by the kqueue() syscall).
_kq int // File descriptor (as returned by the kqueue() syscall).

mu sync.Mutex // Protects access to watcher data
watches map[string]int // Map of watched file descriptors (key: path).
Expand All @@ -50,7 +50,7 @@ func NewWatcher() (*Watcher, error) {
}

w := &Watcher{
kq: kq,
_kq: kq,
watches: make(map[string]int),
watchesByDir: make(map[string]map[int]struct{}),
dirFlags: make(map[string]uint32),
Expand All @@ -62,6 +62,7 @@ func NewWatcher() (*Watcher, error) {
done: make(chan struct{}),
}

go w.closeKq()
go w.readEvents()
return w, nil
}
Expand Down Expand Up @@ -113,7 +114,7 @@ func (w *Watcher) Remove(name string) error {
}

const registerRemove = unix.EV_DELETE
if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil {
if err := register(w.kq(), []int{watchfd}, registerRemove, 0); err != nil {
return err
}

Expand Down Expand Up @@ -172,9 +173,6 @@ func (w *Watcher) WatchList() []string {
// Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE)
const noteAllEvents = unix.NOTE_DELETE | unix.NOTE_WRITE | unix.NOTE_ATTRIB | unix.NOTE_RENAME

// keventWaitTime to block on each read from kevent
var keventWaitTime = durationToTimespec(100 * time.Millisecond)

// addWatch adds name to the watched file set.
// The flags are interpreted as described in kevent(2).
// Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks.
Expand Down Expand Up @@ -255,7 +253,7 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
}

const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE
if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil {
if err := register(w.kq(), []int{watchfd}, registerAdd, flags); err != nil {
unix.Close(watchfd)
return "", err
}
Expand Down Expand Up @@ -296,6 +294,20 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
return name, nil
}

func (w *Watcher) kq() int {
w.mu.Lock()
defer w.mu.Unlock()
return w._kq
}

func (w *Watcher) closeKq() {
<-w.done
w.mu.Lock()
unix.Close(w._kq)
w._kq = -1
w.mu.Unlock()
}

// readEvents reads from kqueue and converts the received kevents into
// Event values that it sends down the Events channel.
func (w *Watcher) readEvents() {
Expand All @@ -311,9 +323,9 @@ loop:
}

// Get new events
kevents, err := read(w.kq, eventBuffer, &keventWaitTime)
kevents, err := read(w.kq(), eventBuffer, nil)
// EINTR is okay, the syscall was interrupted before timeout expired.
if err != nil && err != unix.EINTR {
if err != nil && err != unix.EINTR && err != unix.EBADF {
select {
case w.Errors <- err:
case <-w.done:
Expand Down Expand Up @@ -392,8 +404,8 @@ loop:
}

// cleanup
err := unix.Close(w.kq)
if err != nil {
err := unix.Close(w.kq())
if err != nil && err != unix.EBADF {
// only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors.
select {
case w.Errors <- err:
Expand Down

0 comments on commit 7206899

Please sign in to comment.