From 1750e7096a50354ca8e0de5b18c0b710e5c949bf Mon Sep 17 00:00:00 2001 From: Jeff Hostetler Date: Tue, 27 Jun 2023 10:01:27 -0400 Subject: [PATCH] pipe: add server backlog for concurrent Accept() Teach `pipe.go:ListenPipe()` to create multiple instances of the server pipe in the kernel so that client connections are less likely to receive a `windows.ERROR_PIPE_BUSY` error. This is conceptually similar to the `backlog` argument of the Unix `listen(2)` function. The current `listenerRoutine()` function works sequentially and in response to calls to `Accept()`, such that there will never be more than one unbound server pipe in the NPFS present at any time. Even if the server application calls `Accept()` concurrrently from a pool of application threads, the `listenerRoutine()` will process them sequentially. In this model, because there is only one `listenerRoutine()` instance, there is an interval of time where there are no available unbound/free server pipes. This happens when `ConnectNamedPipe()` returns and `listenerRoutine()` sends the new pipe handle via a channel to the caller of `Accept()` where the application code has an opportunity to use the pipe or give it to another goroutine and then call `Accept()` again. The subsequent `Accept()` call causes `listenerRoutine()` to create a new unbound serer pipe instance in the file system and wait for the next connection. Anytime during this interval, a client will get a pipe busy error. Code in `DialPipe()` hides this from GOLANG callers because it includes a busy-retry loop. However, clients written in other languages without this assistance are likely to see the busy error and be forced deal with it. This change introduces an "accept queue" using a buffered channel and splits `listenerRoutine()` into a pool of listener worker threads. Each worker creates a new unbound pipe instance in the file system and waits for a client connection. The NPFS and kernel handle connection delivery to a random listener worker. The resulting connected pipe is then delivered back to the caller of `Accept()` as before. A `PipeConfig.QueueSize` variable controls the number of listener worker threads and the maximum number of unbound/free pipes server pipes that will be present at any given time. Note that a listener worker will normally have an unbound/free pipe except during that same delivery interval. Having multiple active workers (and unbound pipes in the file system) gives us extra capacity to handle rapidly arriving connections and minimizes the odds of a client seeing a busy error. The application is encouraged to call `Accept()` from a pool of application workers. The size of the application pool should be the same or larger than the queue size to take full advantage of the listener queue. To preserve backwards compatibility, a queue size of 0 or 1 will behave as before. Also for backwards compatibility, listener workers are required to wait for an `Accept()` call so that the worker has a return channel to send the connected pipe and/or error code. This implies that the number of unbound pipes will be the smaller of the queue size and the application pool size. Finally, a Mutex was added to `l.Close()` to ensure that concurrent threads do not simultaneously try to shutdown the pipe. Signed-off-by: Jeff Hostetler --- pipe.go | 194 ++++++++++++++++++++++++++++++++++---------- pipe_test.go | 225 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 377 insertions(+), 42 deletions(-) diff --git a/pipe.go b/pipe.go index 9ecbbc04..4cd99908 100644 --- a/pipe.go +++ b/pipe.go @@ -11,6 +11,7 @@ import ( "net" "os" "runtime" + "sync" "time" "unsafe" @@ -297,9 +298,30 @@ type win32PipeListener struct { firstHandle windows.Handle path string config PipeConfig - acceptCh chan (chan acceptResponse) - closeCh chan int - doneCh chan int + + // `acceptQueueCh` is a buffered channel (of channels). Calls to + // Accept() will append to this queue to schedule a listener-worker + // to create a new named pipe instance in the named pipe file system + // (NPFS) and then listen for a connection from a client. + // + // The resulting connected pipe (or error) will be signalled (back + // to `Accept()`) on the channel value's channel. + acceptQueueCh chan (chan acceptResponse) + + // `shutdownStartedCh` will be closed to indicate that all listener + // workers should shutdown. `l.Close()` will signal this to begin + // a shutdown. + shutdownStartedCh chan struct{} + + // `shutdownFinishedCh` will be closed to indicate that `l.listenerRoutine()` + // has stopped all of the listener worker threads and has finished the + // shutdown. `l.Close()` must wait for this signal before returning. + shutdownFinishedCh chan struct{} + + // `closeMux` is used to create a critical section in `l.Close()` and + // coordinate the shutdown and prevent problems if a second thread calls + // `l.Close()` while a shutdown is in progress. + closeMux sync.Mutex } func makeServerPipeHandle(path string, sd []byte, c *PipeConfig, first bool) (windows.Handle, error) { @@ -426,45 +448,56 @@ func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) { p.Close() p = nil } - case <-l.closeCh: + case <-l.shutdownStartedCh: // Abort the connect request by closing the handle. p.Close() p = nil err = <-ch - if err == nil || err == ErrFileClosed { //nolint:errorlint // err is Errno + if err == nil || err == ErrFileClosed || err == windows.ERROR_OPERATION_ABORTED { //nolint:errorlint // err is Errno err = ErrPipeListenerClosed } } return p, err } -func (l *win32PipeListener) listenerRoutine() { - closed := false - for !closed { +func (l *win32PipeListener) listenerWorker(wg *sync.WaitGroup) { + var stop bool + for !stop { select { - case <-l.closeCh: - closed = true - case responseCh := <-l.acceptCh: - var ( - p *win32File - err error - ) - for { - p, err = l.makeConnectedServerPipe() - // If the connection was immediately closed by the client, try - // again. - if err != windows.ERROR_NO_DATA { //nolint:errorlint // err is Errno - break - } - } + case <-l.shutdownStartedCh: + stop = true + case responseCh := <-l.acceptQueueCh: + p, err := l.makeConnectedServerPipe() responseCh <- acceptResponse{p, err} - closed = err == ErrPipeListenerClosed //nolint:errorlint // err is Errno } } + + wg.Done() +} + +func (l *win32PipeListener) listenerRoutine(queueSize int) { + var wg sync.WaitGroup + + for k := 0; k < queueSize; k++ { + wg.Add(1) + go l.listenerWorker(&wg) + } + + wg.Wait() // for all listenerWorkers to finish. + + // We can assert here that `l.shutdownStartedCh` has been + // signalled (since `l.Close()` closed it). + // + // We might consider draining the `l.acceptQueueCh` and + // closing each of the channel instances, but that is not + // necessary since the second "select" in `l.Accept()` is + // waiting on the `requestCh` and `l.shutdownFinishedCh`. + // And we're going to signal the latter in a moment. + windows.Close(l.firstHandle) l.firstHandle = 0 // Notify Close() and Accept() callers that the handle has been closed. - close(l.doneCh) + close(l.shutdownFinishedCh) } // PipeConfig contain configuration for the pipe listener. @@ -485,6 +518,19 @@ type PipeConfig struct { // OutputBufferSize specifies the size of the output buffer, in bytes. OutputBufferSize int32 + + // QueueSize specifies the maximum number of concurrently active pipe server + // handles to allow. This is conceptually similar to the `backlog` argument + // to `listen(2)` on Unix systems. Increasing this value reduces the likelyhood + // of a connecting client receiving a `windows.ERROR_PIPE_BUSY` error. + // (Assuming that the server is written to call `l.Accept()` using a pool of + // application worker threads.) + // + // This value should be larger than your expected client arrival rate so that + // there are always a few extra listener worker threads and (more importantly) + // unbound server pipes in the kernel, so that a client "CreateFile()" should + // not get a busy signal. + QueueSize int32 } // ListenPipe creates a listener on a Windows named pipe path, e.g. \\.\pipe\mypipe. @@ -503,19 +549,30 @@ func ListenPipe(path string, c *PipeConfig) (net.Listener, error) { return nil, err } } + + queueSize := int(c.QueueSize) + if queueSize < 1 { + // Legacy calls will pass 0 since they won't know to set the queue size. + // Default to legacy behavior where we never have more than 1 available + // unbound pipe and that is only present when an application thread is + // blocked in `l.Accept()`. + queueSize = 1 + } + h, err := makeServerPipeHandle(path, sd, c, true) if err != nil { return nil, err } l := &win32PipeListener{ - firstHandle: h, - path: path, - config: *c, - acceptCh: make(chan (chan acceptResponse)), - closeCh: make(chan int), - doneCh: make(chan int), - } - go l.listenerRoutine() + firstHandle: h, + path: path, + config: *c, + acceptQueueCh: make(chan chan acceptResponse, queueSize), + shutdownStartedCh: make(chan struct{}), + shutdownFinishedCh: make(chan struct{}), + closeMux: sync.Mutex{}, + } + go l.listenerRoutine(queueSize) return l, nil } @@ -535,13 +592,44 @@ func connectPipe(p *win32File) error { } func (l *win32PipeListener) Accept() (net.Conn, error) { +tryAgain: ch := make(chan acceptResponse) + select { - case l.acceptCh <- ch: - response := <-ch - err := response.err - if err != nil { - return nil, err + case l.acceptQueueCh <- ch: + // We have queued a request for a worker thread to listen + // for a connection. + case <-l.shutdownFinishedCh: + // The shutdown completed before we could request a connection. + return nil, ErrPipeListenerClosed + case <-l.shutdownStartedCh: + // The shutdown is already in progress. Don't bother trying to + // schedule a new request. + return nil, ErrPipeListenerClosed + } + + // We queued a request. Now wait for a connection signal or a + // shutdown while we were waiting. + + select { + case response := <-ch: + if response.f == nil && response.err == nil { + // The listener worker could close our channel instance + // to indicate that the listener is shut down. + return nil, ErrPipeListenerClosed + } + if errors.Is(response.err, ErrPipeListenerClosed) { + return nil, ErrPipeListenerClosed + } + if response.err == windows.ERROR_NO_DATA { //nolint:errorlint // err is Errno + // If the connection was immediately closed by the client, + // try again (without reporting an error or a dead connection + // to the `Accept()` caller). This avoids spurious + // "The pipe is being closed." messages. + goto tryAgain + } + if response.err != nil { + return nil, response.err } if l.config.MessageMode { return &win32MessageBytePipe{ @@ -549,17 +637,39 @@ func (l *win32PipeListener) Accept() (net.Conn, error) { }, nil } return &win32Pipe{win32File: response.f, path: l.path}, nil - case <-l.doneCh: + case <-l.shutdownFinishedCh: + // The shutdown started and completed while we were waiting for a + // connection. return nil, ErrPipeListenerClosed + + // case <-l.shutdownStartedCh: + // We DO NOT watch for `l.shutdownStartedCh` because we need + // to keep listening on our local `ch` so that the associated + // listener worker can signal it without blocking when throwing + // an ErrPipeListenerClosed error. } } func (l *win32PipeListener) Close() error { + l.closeMux.Lock() select { - case l.closeCh <- 1: - <-l.doneCh - case <-l.doneCh: + case <-l.shutdownFinishedCh: + // The shutdown has already completed. Nothing to do. + default: + select { + case <-l.shutdownStartedCh: + // The shutdown is in progress. We should not get here because + // of the Mutex, but either way, we don't want to race here + // and accidentally close `l.shutdownStartedCh` twice. + default: + // Cause all listener workers to abort. + close(l.shutdownStartedCh) + // Wait for listenerRoutine to stop the workers and clean up. + <-l.shutdownFinishedCh + } } + l.closeMux.Unlock() + return nil } diff --git a/pipe_test.go b/pipe_test.go index ca725183..38538184 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -8,6 +8,7 @@ import ( "bytes" "context" "errors" + "fmt" "io" "net" "sync" @@ -15,6 +16,7 @@ import ( "time" "unsafe" + "github.com/Microsoft/go-winio/internal/fs" "golang.org/x/sys/windows" ) @@ -643,3 +645,226 @@ func TestListenConnectRace(t *testing.T) { wg.Wait() } } + +// Repeat some of the above tests with `PipeConfig.QueueSize` set +// to verify that we have the same client semantics (timeouts and +// etc.) when we have more than one listener worker. And that +// `l.Close()` shuts down the pipe as expected. There are no calls +// to `l.Accept()`, so all of the `DialPipe()` calls should behave +// the same as the original tests. + +func TestDialListenerTimesOutQueueSize(t *testing.T) { + cfg := PipeConfig{ + QueueSize: 5, + } + l, err := ListenPipe(testPipeName, &cfg) + if err != nil { + t.Fatal(err) + } + defer l.Close() + var d = 10 * time.Millisecond + _, err = DialPipe(testPipeName, &d) + if !errors.Is(err, ErrTimeout) { + t.Fatalf("expected ErrTimeout, got %v", err) + } +} + +func TestDialContextListenerTimesOutQueueSize(t *testing.T) { + cfg := PipeConfig{ + QueueSize: 5, + } + l, err := ListenPipe(testPipeName, &cfg) + if err != nil { + t.Fatal(err) + } + defer l.Close() + var d = 10 * time.Millisecond + ctx, cancel := context.WithTimeout(context.Background(), d) + defer cancel() + _, err = DialPipeContext(ctx, testPipeName) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected context.DeadlineExceeded, got %v", err) + } +} + +func TestDialListenerGetsCancelledQueueSize(t *testing.T) { + cfg := PipeConfig{ + QueueSize: 5, + } + ctx, cancel := context.WithCancel(context.Background()) + l, err := ListenPipe(testPipeName, &cfg) + if err != nil { + t.Fatal(err) + } + ch := make(chan error) + defer l.Close() + go func(ctx context.Context, ch chan error) { + _, err := DialPipeContext(ctx, testPipeName) + ch <- err + }(ctx, ch) + time.Sleep(time.Millisecond * 30) + cancel() + err = <-ch + if !errors.Is(err, context.Canceled) { + t.Fatalf("expected context.Canceled, got %v", err) + } +} + +// tryRawDialPipe is a minimal version of `tryDialPipe()` without +// any of the timeout, cancel, or automatic retry-when-busy logic. +// It just asks the OS to try to open a raw client pipe. +// +// We want this to be as fast as possible and similar to what +// clients in other languages (like "C") would be doing. +func tryRawDialPipe(path string, access fs.AccessMask) (windows.Handle, error) { + h, err := fs.CreateFile(path, + access, + 0, // mode + nil, // security attributes + fs.OPEN_EXISTING, + fs.FILE_FLAG_OVERLAPPED|fs.SECURITY_SQOS_PRESENT|fs.SECURITY_ANONYMOUS, + 0, // template file handle + ) + return h, err +} + +// startAcceptPool creates a pool of "application" threads to accept +// and process pipe connections from clients. +// +// The `poolSize` gives the maximum number of concurrent `Accept()` +// calls that are open. The `poolSize` should (probably) be equal +// to or greater than the pipe layer `queueSize`. The oldest +// `queueSize` of the `Accept()` calls will be mapped (randomly) +// to a listener worker instance (by nature of the buffered accept +// channel). +// +// If the `poolSize` is less than the `queueSize` the pipe layer +// will not be able to efficiently use the larger queue and worker +// threads (because of accept channel blocking). +func startAcceptPool(t *testing.T, poolSize int, wgJoin *sync.WaitGroup, + wgReady *sync.WaitGroup, l net.Listener) { + t.Helper() + + for k := 0; k < poolSize; k++ { + wgJoin.Add(1) + wgReady.Add(1) + go func() { + defer wgJoin.Done() + wgReady.Done() + + finished := false + for !finished { + s, err := l.Accept() + if err == nil { + // A real server application would either process + // this connection or dispatch it to some business + // logic to actually do something before looping + // to accept another connection. + s.Close() + } else if errors.Is(err, ErrPipeListenerClosed) { + finished = true + } else { + t.Error(err) + finished = true + } + } + }() + } +} + +// rapidlyDial is a stress test to rapidly create (and close) a +// series of client pipes and return statistics. +// +// We don't complain about pipe busy errors here because we cannot +// completely eliminate them (even with multiple listener workers). +func rapidlyDial(nrAttempts int) (nrOK int, nrBusy int, nrOther int) { + for j := 0; j < nrAttempts; j++ { + time.Sleep(10 * time.Microsecond) // just enough to let another thread run. + h, err := tryRawDialPipe(testPipeName, fs.GENERIC_READ|fs.GENERIC_WRITE) + if err == nil { + windows.Close(h) + nrOK++ + } else if err == windows.ERROR_PIPE_BUSY { //nolint:errorlint // err is Errno + nrBusy++ + } else { + nrOther++ + } + } + + return nrOK, nrBusy, nrOther +} + +func tryConnectionStress(t *testing.T, queueSize int, poolSize int, nrAttempts int) (nrOK int, nrBusy int, nrOther int) { + t.Helper() + + cfg := PipeConfig{ + QueueSize: int32(queueSize), + } + l, err := ListenPipe(testPipeName, &cfg) + if err != nil { + t.Fatal(err) + } + var wgJoin sync.WaitGroup + var wgReady sync.WaitGroup + startAcceptPool(t, poolSize, &wgJoin, &wgReady, l) + + // Wait for the accept pool to get started before we let + // clients try to hit it. This is more to model a real + // long-runing server that would boot up and then start + // accepting connections from multiple client processes. + // + // Another way to say that is that it doesn't do any good + // to blast thru 1000 client attempts before the accept + // pool and/or the underlying pipe listener workers have + // started (because of cooperative scheduling quirks). + wgReady.Wait() + + nrOK, nrBusy, nrOther = rapidlyDial(nrAttempts) + + l.Close() + wgJoin.Wait() + + return nrOK, nrBusy, nrOther +} + +func TestStressAcceptPool(t *testing.T) { + var tests = []struct { + qs int + ps int + na int + }{ + // A queue size of zero (or 1) is the legacy behavior + // where there is at most 1 unbound server pipe in the + // file system. The pool size doesn't really matter + // because the (ps - qs - 1) Accept() instances will + // be blocked on the channel. + {0, 1, 100}, + {0, 5, 100}, + + // Larger queue sizes enable at most `qs` unbound + // server pipes in the file system. Actually, because + // the pipe code waits for the arrival of an Accept() + // call before creating a new unbound pipe, the file + // system will have at most `min(qs,ps)` unbound pipes. + // So the {5,1,...} should behave like the {0,1,...} + // but with a bit more thread overhead. + {5, 1, 100}, + + // Why we are here cases. We should be able to hit the + // server hard and never get a busy signal. We don't + // assert that because it is still theoretically possible, + // but should now be very unlikely. + {5, 5, 100}, + {5, 10, 100}, + {10, 20, 1000}, + } + + for _, ti := range tests { + nrOK, nrBusy, nrOther := tryConnectionStress(t, ti.qs, ti.ps, ti.na) + name := fmt.Sprintf("case[qs %d][ps %d][na %d]", ti.qs, ti.ps, ti.na) + fmt.Printf("%s: [nrOK %d][nrBusy %d][nrOther %d]\n", name, nrOK, nrBusy, nrOther) + if nrOther > 0 { + t.Errorf("%s: unexpected Accept() errors", name) + } + } +}