From 07711a7959b1e838bf631758c716d8eae02e02ee Mon Sep 17 00:00:00 2001 From: Tobias Krischer Date: Wed, 30 Nov 2022 18:50:52 +0100 Subject: [PATCH 1/2] add optional simulated addresses to pipeconn and inmemorylistener --- fasthttputil/inmemory_listener.go | 42 ++++++++++-- fasthttputil/inmemory_listener_test.go | 93 ++++++++++++++++++++++++++ fasthttputil/pipeconns.go | 20 ++++++ fasthttputil/pipeconns_test.go | 48 +++++++++++++ 4 files changed, 197 insertions(+), 6 deletions(-) diff --git a/fasthttputil/inmemory_listener.go b/fasthttputil/inmemory_listener.go index 87f8b62746..319b734ac4 100644 --- a/fasthttputil/inmemory_listener.go +++ b/fasthttputil/inmemory_listener.go @@ -14,9 +14,10 @@ var ErrInmemoryListenerClosed = errors.New("InmemoryListener is already closed: // It may be used either for fast in-process client<->server communications // without network stack overhead or for client<->server tests. type InmemoryListener struct { - lock sync.Mutex - closed bool - conns chan acceptConn + lock sync.Mutex + closed bool + conns chan acceptConn + listenerAddr net.Addr } type acceptConn struct { @@ -31,6 +32,11 @@ func NewInmemoryListener() *InmemoryListener { } } +// SetLocalAddr sets the (simulated) local address for the listener. +func (ln *InmemoryListener) SetLocalAddr(localAddr net.Addr) { + ln.listenerAddr = localAddr +} + // Accept implements net.Listener's Accept. // // It is safe calling Accept from concurrently running goroutines. @@ -60,12 +66,23 @@ func (ln *InmemoryListener) Close() error { return err } +type inmemoryAddr int + +func (inmemoryAddr) Network() string { + return "inmemory" +} + +func (inmemoryAddr) String() string { + return "InmemoryListener" +} + // Addr implements net.Listener's Addr. func (ln *InmemoryListener) Addr() net.Addr { - return &net.UnixAddr{ - Name: "InmemoryListener", - Net: "memory", + if ln.listenerAddr != nil { + return ln.listenerAddr } + + return inmemoryAddr(0) } // Dial creates new client<->server connection. @@ -74,7 +91,20 @@ func (ln *InmemoryListener) Addr() net.Addr { // // It is safe calling Dial from concurrently running goroutines. func (ln *InmemoryListener) Dial() (net.Conn, error) { + return ln.DialWithLocalAddr(nil) +} + +// DialWithLocalAddr creates new client<->server connection. +// Just like a real Dial it only returns once the server +// has accepted the connection. The local address of the +// client connection can be set with local. +// +// It is safe calling Dial from concurrently running goroutines. +func (ln *InmemoryListener) DialWithLocalAddr(local net.Addr) (net.Conn, error) { pc := NewPipeConns() + + pc.SetAddresses(local, ln.Addr(), ln.Addr(), local) + cConn := pc.Conn1() sConn := pc.Conn2() ln.lock.Lock() diff --git a/fasthttputil/inmemory_listener_test.go b/fasthttputil/inmemory_listener_test.go index 8a8d16acc9..e9d5125202 100644 --- a/fasthttputil/inmemory_listener_test.go +++ b/fasthttputil/inmemory_listener_test.go @@ -189,3 +189,96 @@ func TestInmemoryListenerHTTPConcurrent(t *testing.T) { wg.Wait() }) } + +func acceptLoop(ln net.Listener) { + for { + conn, err := ln.Accept() + if err != nil { + panic(err) + } + + conn.Close() + } +} + +func TestInmemoryListenerAddrDefault(t *testing.T) { + t.Parallel() + + ln := NewInmemoryListener() + + verifyAddr(t, ln.Addr(), inmemoryAddr(0)) + + go func() { + c, err := ln.Dial() + if err != nil { + panic(err) + } + + c.Close() + }() + + lc, err := ln.Accept() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + verifyAddr(t, lc.LocalAddr(), inmemoryAddr(0)) + verifyAddr(t, lc.RemoteAddr(), pipeAddr(0)) + + go acceptLoop(ln) + + c, err := ln.Dial() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + verifyAddr(t, c.LocalAddr(), pipeAddr(0)) + verifyAddr(t, c.RemoteAddr(), inmemoryAddr(0)) +} + +func verifyAddr(t *testing.T, got, expected net.Addr) { + if got != expected { + t.Fatalf("unexpected addr: %v. Expecting %v", got, expected) + } +} + +func TestInmemoryListenerAddrCustom(t *testing.T) { + t.Parallel() + + ln := NewInmemoryListener() + + listenerAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 12345} + + ln.SetLocalAddr(listenerAddr) + + verifyAddr(t, ln.Addr(), listenerAddr) + + go func() { + c, err := ln.Dial() + if err != nil { + panic(err) + } + + c.Close() + }() + + lc, err := ln.Accept() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + verifyAddr(t, lc.LocalAddr(), listenerAddr) + verifyAddr(t, lc.RemoteAddr(), pipeAddr(0)) + + go acceptLoop(ln) + + clientAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 65432} + + c, err := ln.DialWithLocalAddr(clientAddr) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + verifyAddr(t, c.LocalAddr(), clientAddr) + verifyAddr(t, c.RemoteAddr(), listenerAddr) +} diff --git a/fasthttputil/pipeconns.go b/fasthttputil/pipeconns.go index efe829f579..0f954708cd 100644 --- a/fasthttputil/pipeconns.go +++ b/fasthttputil/pipeconns.go @@ -48,6 +48,15 @@ type PipeConns struct { stopChLock sync.Mutex } +// SetAddresses sets the local and remote addresses for the connection. +func (pc *PipeConns) SetAddresses(localAddr1, remoteAddr1, localAddr2, remoteAddr2 net.Addr) { + pc.c1.localAddr = localAddr1 + pc.c1.remoteAddr = remoteAddr1 + + pc.c2.localAddr = localAddr2 + pc.c2.remoteAddr = remoteAddr2 +} + // Conn1 returns the first end of bi-directional pipe. // // Data written to Conn1 may be read from Conn2. @@ -92,6 +101,9 @@ type pipeConn struct { writeDeadlineCh <-chan time.Time readDeadlineChLock sync.Mutex + + localAddr net.Addr + remoteAddr net.Addr } func (c *pipeConn) Write(p []byte) (int, error) { @@ -224,10 +236,18 @@ func (c *pipeConn) Close() error { } func (c *pipeConn) LocalAddr() net.Addr { + if c.localAddr != nil { + return c.localAddr + } + return pipeAddr(0) } func (c *pipeConn) RemoteAddr() net.Addr { + if c.remoteAddr != nil { + return c.remoteAddr + } + return pipeAddr(0) } diff --git a/fasthttputil/pipeconns_test.go b/fasthttputil/pipeconns_test.go index f460639be6..9ac7ee14c7 100644 --- a/fasthttputil/pipeconns_test.go +++ b/fasthttputil/pipeconns_test.go @@ -357,3 +357,51 @@ func testConcurrency(t *testing.T, concurrency int, f func(*testing.T)) { } } } + +func TestPipeConnsAddrDefault(t *testing.T) { + t.Parallel() + + pc := NewPipeConns() + c1 := pc.Conn1() + + if c1.LocalAddr() != pipeAddr(0) { + t.Fatalf("unexpected local address: %v", c1.LocalAddr()) + } + + if c1.RemoteAddr() != pipeAddr(0) { + t.Fatalf("unexpected remote address: %v", c1.RemoteAddr()) + } +} + +func TestPipeConnsAddrCustom(t *testing.T) { + t.Parallel() + + pc := NewPipeConns() + + addr1 := &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4), Port: 1234} + addr2 := &net.TCPAddr{IP: net.IPv4(5, 6, 7, 8), Port: 5678} + addr3 := &net.TCPAddr{IP: net.IPv4(9, 10, 11, 12), Port: 9012} + addr4 := &net.TCPAddr{IP: net.IPv4(13, 14, 15, 16), Port: 3456} + + pc.SetAddresses(addr1, addr2, addr3, addr4) + + c1 := pc.Conn1() + + if c1.LocalAddr() != addr1 { + t.Fatalf("unexpected local address: %v", c1.LocalAddr()) + } + + if c1.RemoteAddr() != addr2 { + t.Fatalf("unexpected remote address: %v", c1.RemoteAddr()) + } + + c2 := pc.Conn1() + + if c2.LocalAddr() != addr1 { + t.Fatalf("unexpected local address: %v", c2.LocalAddr()) + } + + if c2.RemoteAddr() != addr2 { + t.Fatalf("unexpected remote address: %v", c2.RemoteAddr()) + } +} From 52e407cfc22372f58465dfbc7bee1fb336ccd398 Mon Sep 17 00:00:00 2001 From: Tobias Krischer Date: Fri, 2 Dec 2022 10:56:54 +0100 Subject: [PATCH 2/2] add mutexes to addresses of pipeConn and InmemoryListener --- fasthttputil/inmemory_listener.go | 7 +++++++ fasthttputil/pipeconns.go | 13 +++++++++++++ 2 files changed, 20 insertions(+) diff --git a/fasthttputil/inmemory_listener.go b/fasthttputil/inmemory_listener.go index 319b734ac4..b0ad189e3b 100644 --- a/fasthttputil/inmemory_listener.go +++ b/fasthttputil/inmemory_listener.go @@ -18,6 +18,7 @@ type InmemoryListener struct { closed bool conns chan acceptConn listenerAddr net.Addr + addrLock sync.RWMutex } type acceptConn struct { @@ -34,6 +35,9 @@ func NewInmemoryListener() *InmemoryListener { // SetLocalAddr sets the (simulated) local address for the listener. func (ln *InmemoryListener) SetLocalAddr(localAddr net.Addr) { + ln.addrLock.Lock() + defer ln.addrLock.Unlock() + ln.listenerAddr = localAddr } @@ -78,6 +82,9 @@ func (inmemoryAddr) String() string { // Addr implements net.Listener's Addr. func (ln *InmemoryListener) Addr() net.Addr { + ln.addrLock.RLock() + defer ln.addrLock.RUnlock() + if ln.listenerAddr != nil { return ln.listenerAddr } diff --git a/fasthttputil/pipeconns.go b/fasthttputil/pipeconns.go index 0f954708cd..8a338e1fca 100644 --- a/fasthttputil/pipeconns.go +++ b/fasthttputil/pipeconns.go @@ -50,6 +50,12 @@ type PipeConns struct { // SetAddresses sets the local and remote addresses for the connection. func (pc *PipeConns) SetAddresses(localAddr1, remoteAddr1, localAddr2, remoteAddr2 net.Addr) { + pc.c1.addrLock.Lock() + defer pc.c1.addrLock.Unlock() + + pc.c2.addrLock.Lock() + defer pc.c2.addrLock.Unlock() + pc.c1.localAddr = localAddr1 pc.c1.remoteAddr = remoteAddr1 @@ -104,6 +110,7 @@ type pipeConn struct { localAddr net.Addr remoteAddr net.Addr + addrLock sync.RWMutex } func (c *pipeConn) Write(p []byte) (int, error) { @@ -236,6 +243,9 @@ func (c *pipeConn) Close() error { } func (c *pipeConn) LocalAddr() net.Addr { + c.addrLock.RLock() + defer c.addrLock.RUnlock() + if c.localAddr != nil { return c.localAddr } @@ -244,6 +254,9 @@ func (c *pipeConn) LocalAddr() net.Addr { } func (c *pipeConn) RemoteAddr() net.Addr { + c.addrLock.RLock() + defer c.addrLock.RUnlock() + if c.remoteAddr != nil { return c.remoteAddr }