Skip to content

Commit

Permalink
bufconn: Implement read/write deadlines (grpc#2959)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maydell authored and dfawley committed Aug 20, 2019
1 parent 7cc2133 commit 3bb34e5
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 5 deletions.
74 changes: 69 additions & 5 deletions test/bufconn/bufconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,16 @@ type Listener struct {
done chan struct{}
}

// Implementation of net.Error providing timeout
type netErrorTimeout struct {
error
}

func (e netErrorTimeout) Timeout() bool { return true }
func (e netErrorTimeout) Temporary() bool { return false }

var errClosed = fmt.Errorf("closed")
var errTimeout net.Error = netErrorTimeout{error: fmt.Errorf("i/o timeout")}

// Listen returns a Listener that can only be contacted by its own Dialers and
// creates buffered connections between the two.
Expand Down Expand Up @@ -104,6 +113,13 @@ type pipe struct {
wwait sync.Cond
rwait sync.Cond

// Indicate that a write/read timeout has occurred
wtimedout bool
rtimedout bool

wtimer *time.Timer
rtimer *time.Timer

closed bool
writeClosed bool
}
Expand All @@ -112,6 +128,9 @@ func newPipe(sz int) *pipe {
p := &pipe{buf: make([]byte, 0, sz)}
p.wwait.L = &p.mu
p.rwait.L = &p.mu

p.wtimer = time.AfterFunc(0, func() {})
p.rtimer = time.AfterFunc(0, func() {})
return p
}

Expand All @@ -137,6 +156,10 @@ func (p *pipe) Read(b []byte) (n int, err error) {
if p.writeClosed {
return 0, io.EOF
}
if p.rtimedout {
return 0, errTimeout
}

p.rwait.Wait()
}
wasFull := p.full()
Expand Down Expand Up @@ -171,6 +194,10 @@ func (p *pipe) Write(b []byte) (n int, err error) {
if !p.full() {
break
}
if p.wtimedout {
return 0, errTimeout
}

p.wwait.Wait()
}
wasEmpty := p.empty()
Expand Down Expand Up @@ -232,11 +259,48 @@ func (c *conn) Close() error {
return err2
}

func (*conn) LocalAddr() net.Addr { return addr{} }
func (*conn) RemoteAddr() net.Addr { return addr{} }
func (c *conn) SetDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
func (c *conn) SetReadDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
func (c *conn) SetWriteDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
func (c *conn) SetDeadline(t time.Time) error {
c.SetReadDeadline(t)
c.SetWriteDeadline(t)
return nil
}

func (c *conn) SetReadDeadline(t time.Time) error {
p := c.Reader.(*pipe)
p.mu.Lock()
defer p.mu.Unlock()
p.rtimer.Stop()
p.rtimedout = false
if !t.IsZero() {
p.rtimer = time.AfterFunc(time.Until(t), func() {
p.mu.Lock()
defer p.mu.Unlock()
p.rtimedout = true
p.rwait.Broadcast()
})
}
return nil
}

func (c *conn) SetWriteDeadline(t time.Time) error {
p := c.Writer.(*pipe)
p.mu.Lock()
defer p.mu.Unlock()
p.wtimer.Stop()
p.wtimedout = false
if !t.IsZero() {
p.wtimer = time.AfterFunc(time.Until(t), func() {
p.mu.Lock()
defer p.mu.Unlock()
p.wtimedout = true
p.wwait.Broadcast()
})
}
return nil
}

func (*conn) LocalAddr() net.Addr { return addr{} }
func (*conn) RemoteAddr() net.Addr { return addr{} }

type addr struct{}

Expand Down
119 changes: 119 additions & 0 deletions test/bufconn/bufconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,122 @@ func TestCloseWhileAccepting(t *testing.T) {
t.Fatalf("c, err = %v, %v; want nil, %v", c, err, errClosed)
}
}

func TestDeadline(t *testing.T) {
sig := make(chan error)
blockingWrite := func(conn net.Conn) {
_, err := conn.Write([]byte("0123456789"))
sig <- err
}

blockingRead := func(conn net.Conn) {
_, err := conn.Read(make([]byte, 10))
sig <- err
}

p1, p2 := newPipe(5), newPipe(5)
c1, c2 := &conn{p1, p1}, &conn{p2, p2}
defer c1.Close()
defer c2.Close()

// Test with deadline
c1.SetWriteDeadline(time.Now())

go blockingWrite(c1)
select {
case <-time.After(100 * time.Millisecond):
t.Fatalf("Write timeout timed out, c = %v", c1)
case err := <-sig:
if netErr, ok := err.(net.Error); ok {
if !netErr.Timeout() {
t.Fatalf("Write returned unexpected error, c = %v, err = %v", c1, netErr)
}
} else {
t.Fatalf("Write returned unexpected error, c = %v, err = %v", c1, err)
}
}

c2.SetReadDeadline(time.Now())

go blockingRead(c2)
select {
case <-time.After(100 * time.Millisecond):
t.Fatalf("Read timeout timed out, c = %v", c2)
case err := <-sig:
if netErr, ok := err.(net.Error); ok {
if !netErr.Timeout() {
t.Fatalf("Read returned unexpected error, c = %v, err = %v", c2, netErr)
}
} else {
t.Fatalf("Read returned unexpected error, c = %v, err = %v", c2, err)
}
}

// Test timing out pending reads/writes
c1.SetWriteDeadline(time.Time{})
c2.SetReadDeadline(time.Time{})

go blockingWrite(c1)
select {
case <-time.After(100 * time.Millisecond):
case err := <-sig:
t.Fatalf("Write returned before timeout, err = %v", err)
}

c1.SetWriteDeadline(time.Now())
select {
case <-time.After(100 * time.Millisecond):
t.Fatalf("Write timeout timed out, c = %v", c1)
case err := <-sig:
if netErr, ok := err.(net.Error); ok {
if !netErr.Timeout() {
t.Fatalf("Write returned unexpected error, c = %v, err = %v", c1, netErr)
}
} else {
t.Fatalf("Write returned unexpected error, c = %v, err = %v", c1, err)
}
}

go blockingRead(c2)
select {
case <-time.After(100 * time.Millisecond):
case err := <-sig:
t.Fatalf("Read returned before timeout, err = %v", err)
}

c2.SetReadDeadline(time.Now())
select {
case <-time.After(100 * time.Millisecond):
t.Fatalf("Read timeout timed out, c = %v", c2)
case err := <-sig:
if netErr, ok := err.(net.Error); ok {
if !netErr.Timeout() {
t.Fatalf("Read returned unexpected error, c = %v, err = %v", c2, netErr)
}
} else {
t.Fatalf("Read returned unexpected error, c = %v, err = %v", c2, err)
}
}

// Test non-blocking read/write
c1, c2 = &conn{p1, p2}, &conn{p2, p1}

c1.SetWriteDeadline(time.Now().Add(10 * time.Second))
c2.SetReadDeadline(time.Now().Add(10 * time.Second))

// Not blocking here
go blockingWrite(c1)
go blockingRead(c2)

// Read response from both routines
for i := 0; i < 2; i++ {
select {
case <-time.After(100 * time.Millisecond):
t.Fatalf("Read/Write timed out, c1 = %v, c2 = %v", c1, c2)
case err := <-sig:
if err != nil {
t.Fatalf("Read/Write failed to complete, c1 = %v, c2 = %v, err = %v", c1, c2, err)
}
}
}
}

0 comments on commit 3bb34e5

Please sign in to comment.