diff --git a/pipe.go b/pipe.go index 9ecbbc04..4cd99908 100644 --- a/pipe.go +++ b/pipe.go @@ -11,6 +11,7 @@ import ( "net" "os" "runtime" + "sync" "time" "unsafe" @@ -297,9 +298,30 @@ type win32PipeListener struct { firstHandle windows.Handle path string config PipeConfig - acceptCh chan (chan acceptResponse) - closeCh chan int - doneCh chan int + + // `acceptQueueCh` is a buffered channel (of channels). Calls to + // Accept() will append to this queue to schedule a listener-worker + // to create a new named pipe instance in the named pipe file system + // (NPFS) and then listen for a connection from a client. + // + // The resulting connected pipe (or error) will be signalled (back + // to `Accept()`) on the channel value's channel. + acceptQueueCh chan (chan acceptResponse) + + // `shutdownStartedCh` will be closed to indicate that all listener + // workers should shutdown. `l.Close()` will signal this to begin + // a shutdown. + shutdownStartedCh chan struct{} + + // `shutdownFinishedCh` will be closed to indicate that `l.listenerRoutine()` + // has stopped all of the listener worker threads and has finished the + // shutdown. `l.Close()` must wait for this signal before returning. + shutdownFinishedCh chan struct{} + + // `closeMux` is used to create a critical section in `l.Close()` and + // coordinate the shutdown and prevent problems if a second thread calls + // `l.Close()` while a shutdown is in progress. + closeMux sync.Mutex } func makeServerPipeHandle(path string, sd []byte, c *PipeConfig, first bool) (windows.Handle, error) { @@ -426,45 +448,56 @@ func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) { p.Close() p = nil } - case <-l.closeCh: + case <-l.shutdownStartedCh: // Abort the connect request by closing the handle. p.Close() p = nil err = <-ch - if err == nil || err == ErrFileClosed { //nolint:errorlint // err is Errno + if err == nil || err == ErrFileClosed || err == windows.ERROR_OPERATION_ABORTED { //nolint:errorlint // err is Errno err = ErrPipeListenerClosed } } return p, err } -func (l *win32PipeListener) listenerRoutine() { - closed := false - for !closed { +func (l *win32PipeListener) listenerWorker(wg *sync.WaitGroup) { + var stop bool + for !stop { select { - case <-l.closeCh: - closed = true - case responseCh := <-l.acceptCh: - var ( - p *win32File - err error - ) - for { - p, err = l.makeConnectedServerPipe() - // If the connection was immediately closed by the client, try - // again. - if err != windows.ERROR_NO_DATA { //nolint:errorlint // err is Errno - break - } - } + case <-l.shutdownStartedCh: + stop = true + case responseCh := <-l.acceptQueueCh: + p, err := l.makeConnectedServerPipe() responseCh <- acceptResponse{p, err} - closed = err == ErrPipeListenerClosed //nolint:errorlint // err is Errno } } + + wg.Done() +} + +func (l *win32PipeListener) listenerRoutine(queueSize int) { + var wg sync.WaitGroup + + for k := 0; k < queueSize; k++ { + wg.Add(1) + go l.listenerWorker(&wg) + } + + wg.Wait() // for all listenerWorkers to finish. + + // We can assert here that `l.shutdownStartedCh` has been + // signalled (since `l.Close()` closed it). + // + // We might consider draining the `l.acceptQueueCh` and + // closing each of the channel instances, but that is not + // necessary since the second "select" in `l.Accept()` is + // waiting on the `requestCh` and `l.shutdownFinishedCh`. + // And we're going to signal the latter in a moment. + windows.Close(l.firstHandle) l.firstHandle = 0 // Notify Close() and Accept() callers that the handle has been closed. - close(l.doneCh) + close(l.shutdownFinishedCh) } // PipeConfig contain configuration for the pipe listener. @@ -485,6 +518,19 @@ type PipeConfig struct { // OutputBufferSize specifies the size of the output buffer, in bytes. OutputBufferSize int32 + + // QueueSize specifies the maximum number of concurrently active pipe server + // handles to allow. This is conceptually similar to the `backlog` argument + // to `listen(2)` on Unix systems. Increasing this value reduces the likelyhood + // of a connecting client receiving a `windows.ERROR_PIPE_BUSY` error. + // (Assuming that the server is written to call `l.Accept()` using a pool of + // application worker threads.) + // + // This value should be larger than your expected client arrival rate so that + // there are always a few extra listener worker threads and (more importantly) + // unbound server pipes in the kernel, so that a client "CreateFile()" should + // not get a busy signal. + QueueSize int32 } // ListenPipe creates a listener on a Windows named pipe path, e.g. \\.\pipe\mypipe. @@ -503,19 +549,30 @@ func ListenPipe(path string, c *PipeConfig) (net.Listener, error) { return nil, err } } + + queueSize := int(c.QueueSize) + if queueSize < 1 { + // Legacy calls will pass 0 since they won't know to set the queue size. + // Default to legacy behavior where we never have more than 1 available + // unbound pipe and that is only present when an application thread is + // blocked in `l.Accept()`. + queueSize = 1 + } + h, err := makeServerPipeHandle(path, sd, c, true) if err != nil { return nil, err } l := &win32PipeListener{ - firstHandle: h, - path: path, - config: *c, - acceptCh: make(chan (chan acceptResponse)), - closeCh: make(chan int), - doneCh: make(chan int), - } - go l.listenerRoutine() + firstHandle: h, + path: path, + config: *c, + acceptQueueCh: make(chan chan acceptResponse, queueSize), + shutdownStartedCh: make(chan struct{}), + shutdownFinishedCh: make(chan struct{}), + closeMux: sync.Mutex{}, + } + go l.listenerRoutine(queueSize) return l, nil } @@ -535,13 +592,44 @@ func connectPipe(p *win32File) error { } func (l *win32PipeListener) Accept() (net.Conn, error) { +tryAgain: ch := make(chan acceptResponse) + select { - case l.acceptCh <- ch: - response := <-ch - err := response.err - if err != nil { - return nil, err + case l.acceptQueueCh <- ch: + // We have queued a request for a worker thread to listen + // for a connection. + case <-l.shutdownFinishedCh: + // The shutdown completed before we could request a connection. + return nil, ErrPipeListenerClosed + case <-l.shutdownStartedCh: + // The shutdown is already in progress. Don't bother trying to + // schedule a new request. + return nil, ErrPipeListenerClosed + } + + // We queued a request. Now wait for a connection signal or a + // shutdown while we were waiting. + + select { + case response := <-ch: + if response.f == nil && response.err == nil { + // The listener worker could close our channel instance + // to indicate that the listener is shut down. + return nil, ErrPipeListenerClosed + } + if errors.Is(response.err, ErrPipeListenerClosed) { + return nil, ErrPipeListenerClosed + } + if response.err == windows.ERROR_NO_DATA { //nolint:errorlint // err is Errno + // If the connection was immediately closed by the client, + // try again (without reporting an error or a dead connection + // to the `Accept()` caller). This avoids spurious + // "The pipe is being closed." messages. + goto tryAgain + } + if response.err != nil { + return nil, response.err } if l.config.MessageMode { return &win32MessageBytePipe{ @@ -549,17 +637,39 @@ func (l *win32PipeListener) Accept() (net.Conn, error) { }, nil } return &win32Pipe{win32File: response.f, path: l.path}, nil - case <-l.doneCh: + case <-l.shutdownFinishedCh: + // The shutdown started and completed while we were waiting for a + // connection. return nil, ErrPipeListenerClosed + + // case <-l.shutdownStartedCh: + // We DO NOT watch for `l.shutdownStartedCh` because we need + // to keep listening on our local `ch` so that the associated + // listener worker can signal it without blocking when throwing + // an ErrPipeListenerClosed error. } } func (l *win32PipeListener) Close() error { + l.closeMux.Lock() select { - case l.closeCh <- 1: - <-l.doneCh - case <-l.doneCh: + case <-l.shutdownFinishedCh: + // The shutdown has already completed. Nothing to do. + default: + select { + case <-l.shutdownStartedCh: + // The shutdown is in progress. We should not get here because + // of the Mutex, but either way, we don't want to race here + // and accidentally close `l.shutdownStartedCh` twice. + default: + // Cause all listener workers to abort. + close(l.shutdownStartedCh) + // Wait for listenerRoutine to stop the workers and clean up. + <-l.shutdownFinishedCh + } } + l.closeMux.Unlock() + return nil } diff --git a/pipe_test.go b/pipe_test.go index ca725183..38538184 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -8,6 +8,7 @@ import ( "bytes" "context" "errors" + "fmt" "io" "net" "sync" @@ -15,6 +16,7 @@ import ( "time" "unsafe" + "github.com/Microsoft/go-winio/internal/fs" "golang.org/x/sys/windows" ) @@ -643,3 +645,226 @@ func TestListenConnectRace(t *testing.T) { wg.Wait() } } + +// Repeat some of the above tests with `PipeConfig.QueueSize` set +// to verify that we have the same client semantics (timeouts and +// etc.) when we have more than one listener worker. And that +// `l.Close()` shuts down the pipe as expected. There are no calls +// to `l.Accept()`, so all of the `DialPipe()` calls should behave +// the same as the original tests. + +func TestDialListenerTimesOutQueueSize(t *testing.T) { + cfg := PipeConfig{ + QueueSize: 5, + } + l, err := ListenPipe(testPipeName, &cfg) + if err != nil { + t.Fatal(err) + } + defer l.Close() + var d = 10 * time.Millisecond + _, err = DialPipe(testPipeName, &d) + if !errors.Is(err, ErrTimeout) { + t.Fatalf("expected ErrTimeout, got %v", err) + } +} + +func TestDialContextListenerTimesOutQueueSize(t *testing.T) { + cfg := PipeConfig{ + QueueSize: 5, + } + l, err := ListenPipe(testPipeName, &cfg) + if err != nil { + t.Fatal(err) + } + defer l.Close() + var d = 10 * time.Millisecond + ctx, cancel := context.WithTimeout(context.Background(), d) + defer cancel() + _, err = DialPipeContext(ctx, testPipeName) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected context.DeadlineExceeded, got %v", err) + } +} + +func TestDialListenerGetsCancelledQueueSize(t *testing.T) { + cfg := PipeConfig{ + QueueSize: 5, + } + ctx, cancel := context.WithCancel(context.Background()) + l, err := ListenPipe(testPipeName, &cfg) + if err != nil { + t.Fatal(err) + } + ch := make(chan error) + defer l.Close() + go func(ctx context.Context, ch chan error) { + _, err := DialPipeContext(ctx, testPipeName) + ch <- err + }(ctx, ch) + time.Sleep(time.Millisecond * 30) + cancel() + err = <-ch + if !errors.Is(err, context.Canceled) { + t.Fatalf("expected context.Canceled, got %v", err) + } +} + +// tryRawDialPipe is a minimal version of `tryDialPipe()` without +// any of the timeout, cancel, or automatic retry-when-busy logic. +// It just asks the OS to try to open a raw client pipe. +// +// We want this to be as fast as possible and similar to what +// clients in other languages (like "C") would be doing. +func tryRawDialPipe(path string, access fs.AccessMask) (windows.Handle, error) { + h, err := fs.CreateFile(path, + access, + 0, // mode + nil, // security attributes + fs.OPEN_EXISTING, + fs.FILE_FLAG_OVERLAPPED|fs.SECURITY_SQOS_PRESENT|fs.SECURITY_ANONYMOUS, + 0, // template file handle + ) + return h, err +} + +// startAcceptPool creates a pool of "application" threads to accept +// and process pipe connections from clients. +// +// The `poolSize` gives the maximum number of concurrent `Accept()` +// calls that are open. The `poolSize` should (probably) be equal +// to or greater than the pipe layer `queueSize`. The oldest +// `queueSize` of the `Accept()` calls will be mapped (randomly) +// to a listener worker instance (by nature of the buffered accept +// channel). +// +// If the `poolSize` is less than the `queueSize` the pipe layer +// will not be able to efficiently use the larger queue and worker +// threads (because of accept channel blocking). +func startAcceptPool(t *testing.T, poolSize int, wgJoin *sync.WaitGroup, + wgReady *sync.WaitGroup, l net.Listener) { + t.Helper() + + for k := 0; k < poolSize; k++ { + wgJoin.Add(1) + wgReady.Add(1) + go func() { + defer wgJoin.Done() + wgReady.Done() + + finished := false + for !finished { + s, err := l.Accept() + if err == nil { + // A real server application would either process + // this connection or dispatch it to some business + // logic to actually do something before looping + // to accept another connection. + s.Close() + } else if errors.Is(err, ErrPipeListenerClosed) { + finished = true + } else { + t.Error(err) + finished = true + } + } + }() + } +} + +// rapidlyDial is a stress test to rapidly create (and close) a +// series of client pipes and return statistics. +// +// We don't complain about pipe busy errors here because we cannot +// completely eliminate them (even with multiple listener workers). +func rapidlyDial(nrAttempts int) (nrOK int, nrBusy int, nrOther int) { + for j := 0; j < nrAttempts; j++ { + time.Sleep(10 * time.Microsecond) // just enough to let another thread run. + h, err := tryRawDialPipe(testPipeName, fs.GENERIC_READ|fs.GENERIC_WRITE) + if err == nil { + windows.Close(h) + nrOK++ + } else if err == windows.ERROR_PIPE_BUSY { //nolint:errorlint // err is Errno + nrBusy++ + } else { + nrOther++ + } + } + + return nrOK, nrBusy, nrOther +} + +func tryConnectionStress(t *testing.T, queueSize int, poolSize int, nrAttempts int) (nrOK int, nrBusy int, nrOther int) { + t.Helper() + + cfg := PipeConfig{ + QueueSize: int32(queueSize), + } + l, err := ListenPipe(testPipeName, &cfg) + if err != nil { + t.Fatal(err) + } + var wgJoin sync.WaitGroup + var wgReady sync.WaitGroup + startAcceptPool(t, poolSize, &wgJoin, &wgReady, l) + + // Wait for the accept pool to get started before we let + // clients try to hit it. This is more to model a real + // long-runing server that would boot up and then start + // accepting connections from multiple client processes. + // + // Another way to say that is that it doesn't do any good + // to blast thru 1000 client attempts before the accept + // pool and/or the underlying pipe listener workers have + // started (because of cooperative scheduling quirks). + wgReady.Wait() + + nrOK, nrBusy, nrOther = rapidlyDial(nrAttempts) + + l.Close() + wgJoin.Wait() + + return nrOK, nrBusy, nrOther +} + +func TestStressAcceptPool(t *testing.T) { + var tests = []struct { + qs int + ps int + na int + }{ + // A queue size of zero (or 1) is the legacy behavior + // where there is at most 1 unbound server pipe in the + // file system. The pool size doesn't really matter + // because the (ps - qs - 1) Accept() instances will + // be blocked on the channel. + {0, 1, 100}, + {0, 5, 100}, + + // Larger queue sizes enable at most `qs` unbound + // server pipes in the file system. Actually, because + // the pipe code waits for the arrival of an Accept() + // call before creating a new unbound pipe, the file + // system will have at most `min(qs,ps)` unbound pipes. + // So the {5,1,...} should behave like the {0,1,...} + // but with a bit more thread overhead. + {5, 1, 100}, + + // Why we are here cases. We should be able to hit the + // server hard and never get a busy signal. We don't + // assert that because it is still theoretically possible, + // but should now be very unlikely. + {5, 5, 100}, + {5, 10, 100}, + {10, 20, 1000}, + } + + for _, ti := range tests { + nrOK, nrBusy, nrOther := tryConnectionStress(t, ti.qs, ti.ps, ti.na) + name := fmt.Sprintf("case[qs %d][ps %d][na %d]", ti.qs, ti.ps, ti.na) + fmt.Printf("%s: [nrOK %d][nrBusy %d][nrOther %d]\n", name, nrOK, nrBusy, nrOther) + if nrOther > 0 { + t.Errorf("%s: unexpected Accept() errors", name) + } + } +}