Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove keep-alives #46

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Expand Up @@ -14,8 +14,6 @@ Yamux features include:
* Flow control
* Avoid starvation
* Back-pressure to prevent overwhelming a receiver
* Keep Alives
* Enables persistent connections over a load balancer
* Efficient
* Enables thousands of logical streams with low overhead

Expand Down
3 changes: 0 additions & 3 deletions const.go
Expand Up @@ -65,9 +65,6 @@ var (
// ErrConnectionWriteTimeout indicates that we hit the "safety valve"
// timeout writing to the underlying stream connection.
ErrConnectionWriteTimeout = &Error{msg: "connection write timeout", timeout: true}

// ErrKeepAliveTimeout is sent if a missed keepalive caused the stream close
ErrKeepAliveTimeout = &Error{msg: "keepalive timeout", timeout: true}
)

const (
Expand Down
12 changes: 0 additions & 12 deletions mux.go
Expand Up @@ -17,13 +17,6 @@ type Config struct {
// PingBacklog is used to limit how many ping acks we can queue.
PingBacklog int

// EnableKeepalive is used to do a period keep alive
// messages using a ping.
EnableKeepAlive bool

// KeepAliveInterval is how often to perform the keep alive
KeepAliveInterval time.Duration

// ConnectionWriteTimeout is meant to be a "safety valve" timeout after
// we which will suspect a problem with the underlying connection and
// close it. This is only applied to writes, where's there's generally
Expand Down Expand Up @@ -57,8 +50,6 @@ func DefaultConfig() *Config {
return &Config{
AcceptBacklog: 256,
PingBacklog: 32,
EnableKeepAlive: true,
KeepAliveInterval: 30 * time.Second,
ConnectionWriteTimeout: 10 * time.Second,
MaxStreamWindowSize: initialStreamWindow,
LogOutput: os.Stderr,
Expand All @@ -73,9 +64,6 @@ func VerifyConfig(config *Config) error {
if config.AcceptBacklog <= 0 {
return fmt.Errorf("backlog must be positive")
}
if config.KeepAliveInterval == 0 {
return fmt.Errorf("keep-alive interval must be positive")
}
if config.MaxStreamWindowSize < initialStreamWindow {
return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow)
}
Expand Down
72 changes: 0 additions & 72 deletions session.go
Expand Up @@ -88,12 +88,6 @@ type Session struct {
shutdownErr error
shutdownCh chan struct{}
shutdownLock sync.Mutex

// keepaliveTimer is a periodic timer for keepalive messages. It's nil
// when keepalives are disabled.
keepaliveLock sync.Mutex
keepaliveTimer *time.Timer
keepaliveActive bool
}

// newSession is used to construct a new session
Expand Down Expand Up @@ -124,9 +118,6 @@ func newSession(config *Config, conn net.Conn, client bool, readBuf int) *Sessio
} else {
s.nextStreamID = 2
}
if config.EnableKeepAlive {
s.startKeepalive()
}
go s.recv()
go s.send()
return s
Expand Down Expand Up @@ -255,7 +246,6 @@ func (s *Session) Close() error {
}
close(s.shutdownCh)
s.conn.Close()
s.stopKeepalive()
<-s.recvDoneCh
<-s.sendDoneCh

Expand Down Expand Up @@ -350,62 +340,6 @@ func (s *Session) Ping() (dur time.Duration, err error) {
return time.Since(start), nil
}

// startKeepalive starts the keepalive process.
func (s *Session) startKeepalive() {
s.keepaliveLock.Lock()
defer s.keepaliveLock.Unlock()
s.keepaliveTimer = time.AfterFunc(s.config.KeepAliveInterval, func() {
s.keepaliveLock.Lock()
if s.keepaliveTimer == nil || s.keepaliveActive {
// keepalives have been stopped or a keepalive is active.
s.keepaliveLock.Unlock()
return
}
s.keepaliveActive = true
s.keepaliveLock.Unlock()

_, err := s.Ping()

s.keepaliveLock.Lock()
s.keepaliveActive = false
if s.keepaliveTimer != nil {
s.keepaliveTimer.Reset(s.config.KeepAliveInterval)
}
s.keepaliveLock.Unlock()

if err != nil {
s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
s.exitErr(ErrKeepAliveTimeout)
}
})
}

// stopKeepalive stops the keepalive process.
func (s *Session) stopKeepalive() {
s.keepaliveLock.Lock()
defer s.keepaliveLock.Unlock()
if s.keepaliveTimer != nil {
s.keepaliveTimer.Stop()
s.keepaliveTimer = nil
}
}

func (s *Session) extendKeepalive() {
s.keepaliveLock.Lock()
if s.keepaliveTimer != nil && !s.keepaliveActive {
// Don't stop the timer and drain the channel. This is an
// AfterFunc, not a normal timer, and any attempts to drain the
// channel will block forever.
//
// Go will stop the timer for us internally anyways. The docs
// say one must stop the timer before calling reset but that's
// to ensure that the timer doesn't end up firing immediately
// after calling Reset.
s.keepaliveTimer.Reset(s.config.KeepAliveInterval)
}
s.keepaliveLock.Unlock()
}

// send sends the header and body.
func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) error {
select {
Expand Down Expand Up @@ -571,12 +505,6 @@ func (s *Session) recvLoop() error {
return err
}

// Reset the keepalive timer every time we receive data.
// There's no reason to keepalive if we're active. Worse, if the
// peer is busy sending us stuff, the pong might get stuck
// behind a bunch of data.
s.extendKeepalive()

// Verify the version
if hdr.Version() != protoVersion {
s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version())
Expand Down
2 changes: 1 addition & 1 deletion session_norace_test.go
Expand Up @@ -13,7 +13,7 @@ import (
)

func TestSession_PingOfDeath(t *testing.T) {
conf := testConfNoKeepAlive()
conf := testConf()
// This test is slow and can easily time out on writes on CI.
//
// In the future, we might want to prioritize ping-replies over even
Expand Down
83 changes: 5 additions & 78 deletions session_test.go
Expand Up @@ -97,17 +97,10 @@ func testConn() (conn1, conn2 net.Conn) {
func testConf() *Config {
conf := DefaultConfig()
conf.AcceptBacklog = 64
conf.KeepAliveInterval = 100 * time.Millisecond
conf.ConnectionWriteTimeout = 350 * time.Millisecond
return conf
}

func testConfNoKeepAlive() *Config {
conf := testConf()
conf.EnableKeepAlive = false
return conf
}

func testClientServer() (*Session, *Session) {
return testClientServerConfig(testConf())
}
Expand Down Expand Up @@ -993,71 +986,6 @@ func TestBacklogExceeded(t *testing.T) {
}
}

func TestKeepAlive(t *testing.T) {
client, server := testClientServer()
defer client.Close()
defer server.Close()

time.Sleep(200 * time.Millisecond)

// Ping value should increase
client.pingLock.Lock()
defer client.pingLock.Unlock()
if client.pingID == 0 {
t.Fatalf("should ping")
}

server.pingLock.Lock()
defer server.pingLock.Unlock()
if server.pingID == 0 {
t.Fatalf("should ping")
}
}

func TestKeepAlive_Timeout(t *testing.T) {
conn1, conn2 := testConn()

clientConf := testConf()
clientConf.ConnectionWriteTimeout = time.Hour // We're testing keep alives, not connection writes
clientConf.EnableKeepAlive = false // Just test one direction, so it's deterministic who hangs up on whom
client, _ := Client(conn1, clientConf)
defer client.Close()

serverLogs := new(logCapture)
serverConf := testConf()
serverConf.LogOutput = serverLogs

server, _ := Server(conn2, serverConf)
defer server.Close()

errCh := make(chan error, 1)
go func() {
_, err := server.Accept() // Wait until server closes
errCh <- err
}()

// Prevent the client from responding
clientConn := client.conn.(*pipeConn)
clientConn.BlockWrites()

select {
case err := <-errCh:
if err != ErrKeepAliveTimeout {
t.Fatalf("unexpected error: %v", err)
}
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for timeout")
}

if !server.IsClosed() {
t.Fatalf("server should have closed")
}

if !serverLogs.match([]string{"[ERR] yamux: keepalive failed: i/o deadline reached"}) {
t.Fatalf("server log incorect: %v", serverLogs.logs())
}
}

type UnlimitedReader struct{}

func (u *UnlimitedReader) Read(p []byte) (int, error) {
Expand Down Expand Up @@ -1100,7 +1028,7 @@ func TestBacklogExceeded_Accept(t *testing.T) {
}

func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
client, server := testClientServerConfig(testConfNoKeepAlive())
client, server := testClientServerConfig(testConf())
defer client.Close()
defer server.Close()

Expand Down Expand Up @@ -1163,7 +1091,7 @@ func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
}

func TestSession_PartialReadWindowUpdate(t *testing.T) {
client, server := testClientServerConfig(testConfNoKeepAlive())
client, server := testClientServerConfig(testConf())
defer client.Close()
defer server.Close()

Expand Down Expand Up @@ -1238,7 +1166,7 @@ func TestSession_PartialReadWindowUpdate(t *testing.T) {
}

func TestSession_sendMsg_Timeout(t *testing.T) {
client, server := testClientServerConfig(testConfNoKeepAlive())
client, server := testClientServerConfig(testConf())
defer client.Close()
defer server.Close()

Expand All @@ -1265,7 +1193,7 @@ func TestWindowOverflow(t *testing.T) {
// 2. We unlock after resetting the stream.
for i := uint32(1); i < 100; i += 2 {
func() {
client, server := testClientServerConfig(testConfNoKeepAlive())
client, server := testClientServerConfig(testConf())
defer client.Close()
defer server.Close()

Expand All @@ -1287,7 +1215,7 @@ func TestWindowOverflow(t *testing.T) {
}

func TestSession_ConnectionWriteTimeout(t *testing.T) {
client, server := testClientServerConfig(testConfNoKeepAlive())
client, server := testClientServerConfig(testConf())
defer client.Close()
defer server.Close()

Expand Down Expand Up @@ -1463,7 +1391,6 @@ func TestStreamResetRead(t *testing.T) {

func TestLotsOfWritesWithStreamDeadline(t *testing.T) {
config := testConf()
config.EnableKeepAlive = false

client, server := testClientServerConfig(config)
defer client.Close()
Expand Down