Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Use of Kthread-blocking Epoll with Poller Read, Remove Per-Event LStats on Linux #433

Closed
wants to merge 1 commit into from

Conversation

horahoradev
Copy link
Member

@horahoradev horahoradev commented Feb 28, 2022

NOTE: Please do not open a pull request that adds or changes the public API without giving consideration to all supported operating systems.

What does this pull request do?

This PR:

  1. replaces the use of a raw kthread-blocking epoll call for monitoring inotify events with a blocking Read() call using the Go runtime's poller system. This will allow the individual goroutine to block while waiting for I/O, rather than needing to block an entire kernel thread (see: Linux inotify poller wastes an OS thread #240)
  2. Removes Linux per-event uses of LStat to check that the file still exists before passing the event to the user (see: emits CREATE when it should have been REMOVE #404)

Change #1

Where should the reviewer start?

The changes themselves are fairly trivial, but the concepts are not, so I'd recommend familiarizing yourself with the conceptual background. I've provided a few sources with notes on takeaways, and my explanation and rundown of the Go source code as it pertains to this change.

References
  1. os.File.Read() does not return after os.File.Close() and os.File.Fd() golang/go#26439
  • calling Fd() will prevent Read() from returning after close
  • one supported workflow for performing I/O on file descirptors is: setting the FD to non-blocking mode, then passing the FD to os.NewFile (this is what I ended up doing)
  1. os: add a function to construct *File with non-blocking file descriptor golang/go#22939
  • if a file passed to NewFile is in non-blocking mode, then it will be added to the runtime I/O poller
  1. https://morsmachine.dk/netpoller
  • Socket i/O in Golang uses the netpoller mechanism, which handles I/O events in a separate thread, and enables I/O to block the goroutine rather than the entire kernel thread
  • uses platform-specific async I/O mechanisms (epoll, kqueue, etc)
  • this is socket-specific, but normal FS I/O is probably handled the same way
Golang I/O Rundown

In Golang, I/O is blocking, but we can't directly use blocking syscalls, as this would block whichever kernel thread the Goroutine was running in. As a solution, Go I/O is performed via the internal poller package (https://pkg.go.dev/internal/poll), which parks goroutines while waiting for I/O, and uses the platform-specific asynchronous I/O mechanism (e.g. epoll) to detect FDs for which I/O can be performed.

I went through the code to make sure that this is the case, here's a small rundown:
Here's the implementation of read in *os.FIle:

// Read reads up to len(b) bytes from the File and stores them in b.
// It returns the number of bytes read and any error encountered.
// At end of file, Read returns 0, io.EOF.
func (f *File) Read(b []byte) (n int, err error) {
	if err := f.checkValid("read"); err != nil {
		return 0, err
	}
	n, e := f.read(b)
	return n, f.wrapErr("read", e)
}

we can see this method calls .read(), which is defined as follows for posix:

// read reads up to len(b) bytes from the File.
// It returns the number of bytes read and an error, if any.
func (f *File) read(b []byte) (n int, err error) {
	n, err = f.pfd.Read(b)
	runtime.KeepAlive(f)
	return n, err
}

It calls pfd.Read(), which we can see refers to poll.FD:

// File represents an open file descriptor.
type File struct {
	*file // os specific
}
// file is the real representation of *File.
// The extra level of indirection ensures that no clients of os
// can overwrite this data, which could cause the finalizer
// to close the wrong file descriptor.
type file struct {
	pfd         poll.FD
	name        string
	dirinfo     *dirInfo // nil unless directory being read
	nonblock    bool     // whether we set nonblocking mode
	stdoutOrErr bool     // whether this is stdout or stderr
	appendMode  bool     // whether file is opened for appending
}

The implementation of poll.FD's read for unix is as follows:

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
	if err := fd.readLock(); err != nil {
		return 0, err
	}
	defer fd.readUnlock()
	if len(p) == 0 {
		// If the caller wanted a zero byte read, return immediately
		// without trying (but after acquiring the readLock).
		// Otherwise syscall.Read returns 0, nil which looks like
		// io.EOF.
		// TODO(bradfitz): make it wait for readability? (Issue 15735)
		return 0, nil
	}
	if err := fd.pd.prepareRead(fd.isFile); err != nil {
		return 0, err
	}
	if fd.IsStream && len(p) > maxRW {
		p = p[:maxRW]
	}
	for {
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
		if err != nil {
			n = 0
			if err == syscall.EAGAIN && fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}

We first execute the read syscall. If the file descriptor is non-blocking, then it will return quickly with EAGAIN and wait for waitRead to return. If it's blocking, then it'll cause the current kernel thread to sleep, and will not call waitRead(), as the fd isn't pollable and doesn't return EAGAIN.

waitRead is as follows:

func (pd *pollDesc) waitRead(isFile bool) error {
	return pd.wait('r', isFile)
}

which calls pd.Wait:

func (pd *pollDesc) wait(mode int, isFile bool) error {
	if pd.runtimeCtx == 0 {
		return errors.New("waiting for unsupported file type")
	}
	res := runtime_pollWait(pd.runtimeCtx, mode)
	return convertErr(res, isFile)
}

which calls runtime_pollWait() (note that this is the internal/poll package)

// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.
// This returns an error code; the codes are defined above.
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
	errcode := netpollcheckerr(pd, int32(mode))
	if errcode != pollNoError {
		return errcode
	}
	// As for now only Solaris, illumos, and AIX use level-triggered IO.
	if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
		netpollarm(pd, mode)
	}
	for !netpollblock(pd, int32(mode), false) {
		errcode = netpollcheckerr(pd, int32(mode))
		if errcode != pollNoError {
			return errcode
		}
		// Can happen if timeout has fired and unblocked us,
		// but before we had a chance to run, timeout has been reset.
		// Pretend it has not happened and retry.
	}
	return pollNoError
}

which calls netpollblock:

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}

	// set the gpp semaphore to pdWait
	for {
		// Consume notification if already ready.
		if gpp.CompareAndSwap(pdReady, 0) {
			return true
		}
		if gpp.CompareAndSwap(0, pdWait) {
			break
		}

		// Double check that this isn't corrupt; otherwise we'd loop
		// forever.
		if v := gpp.Load(); v != pdReady && v != 0 {
			throw("runtime: double wait")
		}
	}

	// need to recheck error states after setting gpp to pdWait
	// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
	// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
	if waitio || netpollcheckerr(pd, mode) == pollNoError {
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
	}
	// be careful to not lose concurrent pdReady notification
	old := gpp.Swap(0)
	if old > pdWait {
		throw("runtime: corrupted polldesc")
	}
	return old == pdReady
}

I'll assume that the comment, " returns true if IO is ready, or false if timedout or closed" is true. The current goroutine is parked until I/O is ready.

gpp is set to ready within the function netpollunblock, which is called by netpollready :

// netpollready is called by the platform-specific netpoll function.
// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
// whether the fd is ready for reading or writing or both.
//
// This may run while the world is stopped, so write barriers are not allowed.
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
	var rg, wg *g
	if mode == 'r' || mode == 'r'+'w' {
		rg = netpollunblock(pd, 'r', true)
	}
	if mode == 'w' || mode == 'r'+'w' {
		wg = netpollunblock(pd, 'w', true)
	}
	if rg != nil {
		toRun.push(rg)
	}
	if wg != nil {
		toRun.push(wg)
	}
}

netpollready is called by the platform-specific implementation of netpoll(); here's the implementation for Linux, which uses epoll to determine which FDs are ready for I/O, and notifies the corresponding poll descriptor.

// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
	if epfd == -1 {
		return gList{}
	}
	var waitms int32
	if delay < 0 {
		waitms = -1
	} else if delay == 0 {
		waitms = 0
	} else if delay < 1e6 {
		waitms = 1
	} else if delay < 1e15 {
		waitms = int32(delay / 1e6)
	} else {
		// An arbitrary cap on how long to wait for a timer.
		// 1e9 ms == ~11.5 days.
		waitms = 1e9
	}
	var events [128]epollevent
retry:
	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
	if n < 0 {
		if n != -_EINTR {
			println("runtime: epollwait on fd", epfd, "failed with", -n)
			throw("runtime: netpoll failed")
		}
		// If a timed sleep was interrupted, just return to
		// recalculate how long we should sleep now.
		if waitms > 0 {
			return gList{}
		}
		goto retry
	}
	var toRun gList
	for i := int32(0); i < n; i++ {
		ev := &events[i]
		if ev.events == 0 {
			continue
		}

		if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
			if ev.events != _EPOLLIN {
				println("runtime: netpoll: break fd ready for", ev.events)
				throw("runtime: netpoll: break fd ready for something unexpected")
			}
			if delay != 0 {
				// netpollBreak could be picked up by a
				// nonblocking poll. Only read the byte
				// if blocking.
				var tmp [16]byte
				read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
				atomic.Store(&netpollWakeSig, 0)
			}
			continue
		}

		var mode int32
		if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'r'
		}
		if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
			mode += 'w'
		}
		if mode != 0 {
			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
			pd.setEventErr(ev.events == _EPOLLERR)
			netpollready(&toRun, pd, mode)
		}
	}
	return toRun
}

How should this be manually tested?

I think that the changes I've made are fairly easy to reason about, and automated tests should validate that the behavior is correct. I tested this change on Linux, which is the affected platform. I did not test on other platforms.

Change #2

Where should the reviewer start?

I would read #404. My argument is that since inotify guarantees that events will be sent in order, and by the time the client receives the event for which we're calling lstat the file may have been removed anyway, there's no need to perform LStat for so many file events.

How should this be manually tested?

I think the automated tests should cover it. I tested this change on Linux, which is the affected platform. I did not test on other platforms.

@horahoradev horahoradev changed the title Replace Use of Kthread-blocking Epoll with Poller Read Replace Use of Kthread-blocking Epoll with Poller Read, Remove Per-Event LStats on Linux Feb 28, 2022
arp242 added a commit that referenced this pull request Jul 24, 2022
…ent LStats on Linux #433 (#434)

* Replaced use of raw epoll with netpoller read

* Remove Debian 6 Vagrant test; it's in #469 now

* Added ignoreLinux lstats back in

* Update test

Co-authored-by: Martin Tournoij <martin@arp242.net>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant