From d97304691a754e136519af9b8d16ee0641a048a7 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 16 Feb 2021 13:48:11 +0800 Subject: [PATCH] remove keep-alives Keeping a connection alive is a responsibility of the underlying transport, not of the stream multiplexer. --- README.md | 2 - const.go | 3 -- mux.go | 12 ------ session.go | 72 ------------------------------------ session_norace_test.go | 2 +- session_test.go | 83 +++--------------------------------------- 6 files changed, 6 insertions(+), 168 deletions(-) diff --git a/README.md b/README.md index 4c8988a..9751f4d 100644 --- a/README.md +++ b/README.md @@ -14,8 +14,6 @@ Yamux features include: * Flow control * Avoid starvation * Back-pressure to prevent overwhelming a receiver -* Keep Alives - * Enables persistent connections over a load balancer * Efficient * Enables thousands of logical streams with low overhead diff --git a/const.go b/const.go index 9e11ba3..c0a827b 100644 --- a/const.go +++ b/const.go @@ -65,9 +65,6 @@ var ( // ErrConnectionWriteTimeout indicates that we hit the "safety valve" // timeout writing to the underlying stream connection. ErrConnectionWriteTimeout = &Error{msg: "connection write timeout", timeout: true} - - // ErrKeepAliveTimeout is sent if a missed keepalive caused the stream close - ErrKeepAliveTimeout = &Error{msg: "keepalive timeout", timeout: true} ) const ( diff --git a/mux.go b/mux.go index 8b9b154..317d1a5 100644 --- a/mux.go +++ b/mux.go @@ -17,13 +17,6 @@ type Config struct { // PingBacklog is used to limit how many ping acks we can queue. PingBacklog int - // EnableKeepalive is used to do a period keep alive - // messages using a ping. - EnableKeepAlive bool - - // KeepAliveInterval is how often to perform the keep alive - KeepAliveInterval time.Duration - // ConnectionWriteTimeout is meant to be a "safety valve" timeout after // we which will suspect a problem with the underlying connection and // close it. This is only applied to writes, where's there's generally @@ -57,8 +50,6 @@ func DefaultConfig() *Config { return &Config{ AcceptBacklog: 256, PingBacklog: 32, - EnableKeepAlive: true, - KeepAliveInterval: 30 * time.Second, ConnectionWriteTimeout: 10 * time.Second, MaxStreamWindowSize: initialStreamWindow, LogOutput: os.Stderr, @@ -73,9 +64,6 @@ func VerifyConfig(config *Config) error { if config.AcceptBacklog <= 0 { return fmt.Errorf("backlog must be positive") } - if config.KeepAliveInterval == 0 { - return fmt.Errorf("keep-alive interval must be positive") - } if config.MaxStreamWindowSize < initialStreamWindow { return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow) } diff --git a/session.go b/session.go index 55b4e28..5a25f9f 100644 --- a/session.go +++ b/session.go @@ -88,12 +88,6 @@ type Session struct { shutdownErr error shutdownCh chan struct{} shutdownLock sync.Mutex - - // keepaliveTimer is a periodic timer for keepalive messages. It's nil - // when keepalives are disabled. - keepaliveLock sync.Mutex - keepaliveTimer *time.Timer - keepaliveActive bool } // newSession is used to construct a new session @@ -124,9 +118,6 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio } else { s.nextStreamID = 2 } - if config.EnableKeepAlive { - s.startKeepalive() - } go s.recv() go s.send() return s @@ -255,7 +246,6 @@ func (s *Session) Close() error { } close(s.shutdownCh) s.conn.Close() - s.stopKeepalive() <-s.recvDoneCh <-s.sendDoneCh @@ -350,62 +340,6 @@ func (s *Session) Ping() (dur time.Duration, err error) { return time.Since(start), nil } -// startKeepalive starts the keepalive process. -func (s *Session) startKeepalive() { - s.keepaliveLock.Lock() - defer s.keepaliveLock.Unlock() - s.keepaliveTimer = time.AfterFunc(s.config.KeepAliveInterval, func() { - s.keepaliveLock.Lock() - if s.keepaliveTimer == nil || s.keepaliveActive { - // keepalives have been stopped or a keepalive is active. - s.keepaliveLock.Unlock() - return - } - s.keepaliveActive = true - s.keepaliveLock.Unlock() - - _, err := s.Ping() - - s.keepaliveLock.Lock() - s.keepaliveActive = false - if s.keepaliveTimer != nil { - s.keepaliveTimer.Reset(s.config.KeepAliveInterval) - } - s.keepaliveLock.Unlock() - - if err != nil { - s.logger.Printf("[ERR] yamux: keepalive failed: %v", err) - s.exitErr(ErrKeepAliveTimeout) - } - }) -} - -// stopKeepalive stops the keepalive process. -func (s *Session) stopKeepalive() { - s.keepaliveLock.Lock() - defer s.keepaliveLock.Unlock() - if s.keepaliveTimer != nil { - s.keepaliveTimer.Stop() - s.keepaliveTimer = nil - } -} - -func (s *Session) extendKeepalive() { - s.keepaliveLock.Lock() - if s.keepaliveTimer != nil && !s.keepaliveActive { - // Don't stop the timer and drain the channel. This is an - // AfterFunc, not a normal timer, and any attempts to drain the - // channel will block forever. - // - // Go will stop the timer for us internally anyways. The docs - // say one must stop the timer before calling reset but that's - // to ensure that the timer doesn't end up firing immediately - // after calling Reset. - s.keepaliveTimer.Reset(s.config.KeepAliveInterval) - } - s.keepaliveLock.Unlock() -} - // send sends the header and body. func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) error { select { @@ -571,12 +505,6 @@ func (s *Session) recvLoop() error { return err } - // Reset the keepalive timer every time we receive data. - // There's no reason to keepalive if we're active. Worse, if the - // peer is busy sending us stuff, the pong might get stuck - // behind a bunch of data. - s.extendKeepalive() - // Verify the version if hdr.Version() != protoVersion { s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version()) diff --git a/session_norace_test.go b/session_norace_test.go index 4c45bd7..67ccb93 100644 --- a/session_norace_test.go +++ b/session_norace_test.go @@ -13,7 +13,7 @@ import ( ) func TestSession_PingOfDeath(t *testing.T) { - conf := testConfNoKeepAlive() + conf := testConf() // This test is slow and can easily time out on writes on CI. // // In the future, we might want to prioritize ping-replies over even diff --git a/session_test.go b/session_test.go index 7f99f98..ef055c1 100644 --- a/session_test.go +++ b/session_test.go @@ -97,17 +97,10 @@ func testConn() (conn1, conn2 net.Conn) { func testConf() *Config { conf := DefaultConfig() conf.AcceptBacklog = 64 - conf.KeepAliveInterval = 100 * time.Millisecond conf.ConnectionWriteTimeout = 350 * time.Millisecond return conf } -func testConfNoKeepAlive() *Config { - conf := testConf() - conf.EnableKeepAlive = false - return conf -} - func testClientServer() (*Session, *Session) { return testClientServerConfig(testConf()) } @@ -993,71 +986,6 @@ func TestBacklogExceeded(t *testing.T) { } } -func TestKeepAlive(t *testing.T) { - client, server := testClientServer() - defer client.Close() - defer server.Close() - - time.Sleep(200 * time.Millisecond) - - // Ping value should increase - client.pingLock.Lock() - defer client.pingLock.Unlock() - if client.pingID == 0 { - t.Fatalf("should ping") - } - - server.pingLock.Lock() - defer server.pingLock.Unlock() - if server.pingID == 0 { - t.Fatalf("should ping") - } -} - -func TestKeepAlive_Timeout(t *testing.T) { - conn1, conn2 := testConn() - - clientConf := testConf() - clientConf.ConnectionWriteTimeout = time.Hour // We're testing keep alives, not connection writes - clientConf.EnableKeepAlive = false // Just test one direction, so it's deterministic who hangs up on whom - client, _ := Client(conn1, clientConf) - defer client.Close() - - serverLogs := new(logCapture) - serverConf := testConf() - serverConf.LogOutput = serverLogs - - server, _ := Server(conn2, serverConf) - defer server.Close() - - errCh := make(chan error, 1) - go func() { - _, err := server.Accept() // Wait until server closes - errCh <- err - }() - - // Prevent the client from responding - clientConn := client.conn.(*pipeConn) - clientConn.BlockWrites() - - select { - case err := <-errCh: - if err != ErrKeepAliveTimeout { - t.Fatalf("unexpected error: %v", err) - } - case <-time.After(1 * time.Second): - t.Fatalf("timeout waiting for timeout") - } - - if !server.IsClosed() { - t.Fatalf("server should have closed") - } - - if !serverLogs.match([]string{"[ERR] yamux: keepalive failed: i/o deadline reached"}) { - t.Fatalf("server log incorect: %v", serverLogs.logs()) - } -} - type UnlimitedReader struct{} func (u *UnlimitedReader) Read(p []byte) (int, error) { @@ -1100,7 +1028,7 @@ func TestBacklogExceeded_Accept(t *testing.T) { } func TestSession_WindowUpdateWriteDuringRead(t *testing.T) { - client, server := testClientServerConfig(testConfNoKeepAlive()) + client, server := testClientServerConfig(testConf()) defer client.Close() defer server.Close() @@ -1163,7 +1091,7 @@ func TestSession_WindowUpdateWriteDuringRead(t *testing.T) { } func TestSession_PartialReadWindowUpdate(t *testing.T) { - client, server := testClientServerConfig(testConfNoKeepAlive()) + client, server := testClientServerConfig(testConf()) defer client.Close() defer server.Close() @@ -1238,7 +1166,7 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) { } func TestSession_sendMsg_Timeout(t *testing.T) { - client, server := testClientServerConfig(testConfNoKeepAlive()) + client, server := testClientServerConfig(testConf()) defer client.Close() defer server.Close() @@ -1265,7 +1193,7 @@ func TestWindowOverflow(t *testing.T) { // 2. We unlock after resetting the stream. for i := uint32(1); i < 100; i += 2 { func() { - client, server := testClientServerConfig(testConfNoKeepAlive()) + client, server := testClientServerConfig(testConf()) defer client.Close() defer server.Close() @@ -1287,7 +1215,7 @@ func TestWindowOverflow(t *testing.T) { } func TestSession_ConnectionWriteTimeout(t *testing.T) { - client, server := testClientServerConfig(testConfNoKeepAlive()) + client, server := testClientServerConfig(testConf()) defer client.Close() defer server.Close() @@ -1463,7 +1391,6 @@ func TestStreamResetRead(t *testing.T) { func TestLotsOfWritesWithStreamDeadline(t *testing.T) { config := testConf() - config.EnableKeepAlive = false client, server := testClientServerConfig(config) defer client.Close()