Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kqueue.go: Remove timeout from reading kevents. #262

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 23 additions & 11 deletions kqueue.go
Expand Up @@ -25,7 +25,7 @@ type Watcher struct {
Errors chan error
done chan struct{} // Channel for sending a "quit message" to the reader goroutine

kq int // File descriptor (as returned by the kqueue() syscall).
_kq int // File descriptor (as returned by the kqueue() syscall).

mu sync.Mutex // Protects access to watcher data
watches map[string]int // Map of watched file descriptors (key: path).
Expand All @@ -50,7 +50,7 @@ func NewWatcher() (*Watcher, error) {
}

w := &Watcher{
kq: kq,
_kq: kq,
watches: make(map[string]int),
watchesByDir: make(map[string]map[int]struct{}),
dirFlags: make(map[string]uint32),
Expand All @@ -62,6 +62,7 @@ func NewWatcher() (*Watcher, error) {
done: make(chan struct{}),
}

go w.closeKq()
go w.readEvents()
return w, nil
}
Expand Down Expand Up @@ -113,7 +114,7 @@ func (w *Watcher) Remove(name string) error {
}

const registerRemove = unix.EV_DELETE
if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil {
if err := register(w.kq(), []int{watchfd}, registerRemove, 0); err != nil {
return err
}

Expand Down Expand Up @@ -172,9 +173,6 @@ func (w *Watcher) WatchList() []string {
// Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE)
const noteAllEvents = unix.NOTE_DELETE | unix.NOTE_WRITE | unix.NOTE_ATTRIB | unix.NOTE_RENAME

// keventWaitTime to block on each read from kevent
var keventWaitTime = durationToTimespec(100 * time.Millisecond)

// addWatch adds name to the watched file set.
// The flags are interpreted as described in kevent(2).
// Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks.
Expand Down Expand Up @@ -256,7 +254,7 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
}

const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE
if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil {
if err := register(w.kq(), []int{watchfd}, registerAdd, flags); err != nil {
unix.Close(watchfd)
return "", err
}
Expand Down Expand Up @@ -297,6 +295,20 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
return name, nil
}

func (w *Watcher) kq() int {
w.mu.Lock()
defer w.mu.Unlock()
return w._kq
}

func (w *Watcher) closeKq() {
<-w.done
w.mu.Lock()
unix.Close(w._kq)
w._kq = -1
w.mu.Unlock()
}

// readEvents reads from kqueue and converts the received kevents into
// Event values that it sends down the Events channel.
func (w *Watcher) readEvents() {
Expand All @@ -312,9 +324,9 @@ loop:
}

// Get new events
kevents, err := read(w.kq, eventBuffer, &keventWaitTime)
kevents, err := read(w.kq(), eventBuffer, nil)
// EINTR is okay, the syscall was interrupted before timeout expired.
if err != nil && err != unix.EINTR {
if err != nil && err != unix.EINTR && err != unix.EBADF {
select {
case w.Errors <- err:
case <-w.done:
Expand Down Expand Up @@ -393,8 +405,8 @@ loop:
}

// cleanup
err := unix.Close(w.kq)
if err != nil {
err := unix.Close(w.kq())
if err != nil && err != unix.EBADF {
// only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors.
select {
case w.Errors <- err:
Expand Down