Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add optional simulated addresses to pipeconn and inmemorylistener #1449

Merged
merged 2 commits into from Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 43 additions & 6 deletions fasthttputil/inmemory_listener.go
Expand Up @@ -14,9 +14,11 @@ var ErrInmemoryListenerClosed = errors.New("InmemoryListener is already closed:
// It may be used either for fast in-process client<->server communications
// without network stack overhead or for client<->server tests.
type InmemoryListener struct {
lock sync.Mutex
closed bool
conns chan acceptConn
lock sync.Mutex
closed bool
conns chan acceptConn
listenerAddr net.Addr
addrLock sync.RWMutex
}

type acceptConn struct {
Expand All @@ -31,6 +33,14 @@ func NewInmemoryListener() *InmemoryListener {
}
}

// SetLocalAddr sets the (simulated) local address for the listener.
func (ln *InmemoryListener) SetLocalAddr(localAddr net.Addr) {
ln.addrLock.Lock()
defer ln.addrLock.Unlock()

ln.listenerAddr = localAddr
}

// Accept implements net.Listener's Accept.
//
// It is safe calling Accept from concurrently running goroutines.
Expand Down Expand Up @@ -60,12 +70,26 @@ func (ln *InmemoryListener) Close() error {
return err
}

type inmemoryAddr int

func (inmemoryAddr) Network() string {
return "inmemory"
}

func (inmemoryAddr) String() string {
return "InmemoryListener"
}

// Addr implements net.Listener's Addr.
func (ln *InmemoryListener) Addr() net.Addr {
return &net.UnixAddr{
Name: "InmemoryListener",
Net: "memory",
ln.addrLock.RLock()
defer ln.addrLock.RUnlock()

if ln.listenerAddr != nil {
return ln.listenerAddr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do SetLocalAddr() and Addr() need to be considered for concurrency?

Copy link
Contributor Author

@tobikris tobikris Dec 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could argue that it needs to be. However, I thought the only meaningful way to set the address would be to do it before the listener is used in any way. I just did not add it to NewInmemoryListener() to leave this completely optional.

It is probably better to guard against data races, though, and to do not leave this to the caller. I will implement it, thanks!

}

return inmemoryAddr(0)
}

// Dial creates new client<->server connection.
Expand All @@ -74,7 +98,20 @@ func (ln *InmemoryListener) Addr() net.Addr {
//
// It is safe calling Dial from concurrently running goroutines.
func (ln *InmemoryListener) Dial() (net.Conn, error) {
return ln.DialWithLocalAddr(nil)
}

// DialWithLocalAddr creates new client<->server connection.
// Just like a real Dial it only returns once the server
// has accepted the connection. The local address of the
// client connection can be set with local.
//
// It is safe calling Dial from concurrently running goroutines.
func (ln *InmemoryListener) DialWithLocalAddr(local net.Addr) (net.Conn, error) {
pc := NewPipeConns()

pc.SetAddresses(local, ln.Addr(), ln.Addr(), local)

cConn := pc.Conn1()
sConn := pc.Conn2()
ln.lock.Lock()
Expand Down
93 changes: 93 additions & 0 deletions fasthttputil/inmemory_listener_test.go
Expand Up @@ -189,3 +189,96 @@ func TestInmemoryListenerHTTPConcurrent(t *testing.T) {
wg.Wait()
})
}

func acceptLoop(ln net.Listener) {
for {
conn, err := ln.Accept()
if err != nil {
panic(err)
}

conn.Close()
}
}

func TestInmemoryListenerAddrDefault(t *testing.T) {
t.Parallel()

ln := NewInmemoryListener()

verifyAddr(t, ln.Addr(), inmemoryAddr(0))

go func() {
c, err := ln.Dial()
if err != nil {
panic(err)
}

c.Close()
}()

lc, err := ln.Accept()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

verifyAddr(t, lc.LocalAddr(), inmemoryAddr(0))
verifyAddr(t, lc.RemoteAddr(), pipeAddr(0))

go acceptLoop(ln)

c, err := ln.Dial()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

verifyAddr(t, c.LocalAddr(), pipeAddr(0))
verifyAddr(t, c.RemoteAddr(), inmemoryAddr(0))
}

func verifyAddr(t *testing.T, got, expected net.Addr) {
if got != expected {
t.Fatalf("unexpected addr: %v. Expecting %v", got, expected)
}
}

func TestInmemoryListenerAddrCustom(t *testing.T) {
t.Parallel()

ln := NewInmemoryListener()

listenerAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 12345}

ln.SetLocalAddr(listenerAddr)

verifyAddr(t, ln.Addr(), listenerAddr)

go func() {
c, err := ln.Dial()
if err != nil {
panic(err)
}

c.Close()
}()

lc, err := ln.Accept()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

verifyAddr(t, lc.LocalAddr(), listenerAddr)
verifyAddr(t, lc.RemoteAddr(), pipeAddr(0))

go acceptLoop(ln)

clientAddr := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 65432}

c, err := ln.DialWithLocalAddr(clientAddr)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

verifyAddr(t, c.LocalAddr(), clientAddr)
verifyAddr(t, c.RemoteAddr(), listenerAddr)
}
33 changes: 33 additions & 0 deletions fasthttputil/pipeconns.go
Expand Up @@ -48,6 +48,21 @@ type PipeConns struct {
stopChLock sync.Mutex
}

// SetAddresses sets the local and remote addresses for the connection.
func (pc *PipeConns) SetAddresses(localAddr1, remoteAddr1, localAddr2, remoteAddr2 net.Addr) {
pc.c1.addrLock.Lock()
defer pc.c1.addrLock.Unlock()

pc.c2.addrLock.Lock()
defer pc.c2.addrLock.Unlock()

pc.c1.localAddr = localAddr1
pc.c1.remoteAddr = remoteAddr1

pc.c2.localAddr = localAddr2
pc.c2.remoteAddr = remoteAddr2
}

// Conn1 returns the first end of bi-directional pipe.
//
// Data written to Conn1 may be read from Conn2.
Expand Down Expand Up @@ -92,6 +107,10 @@ type pipeConn struct {
writeDeadlineCh <-chan time.Time

readDeadlineChLock sync.Mutex

localAddr net.Addr
remoteAddr net.Addr
addrLock sync.RWMutex
}

func (c *pipeConn) Write(p []byte) (int, error) {
Expand Down Expand Up @@ -224,10 +243,24 @@ func (c *pipeConn) Close() error {
}

func (c *pipeConn) LocalAddr() net.Addr {
c.addrLock.RLock()
defer c.addrLock.RUnlock()

if c.localAddr != nil {
return c.localAddr
}

return pipeAddr(0)
}

func (c *pipeConn) RemoteAddr() net.Addr {
c.addrLock.RLock()
defer c.addrLock.RUnlock()

if c.remoteAddr != nil {
return c.remoteAddr
}

return pipeAddr(0)
}

Expand Down
48 changes: 48 additions & 0 deletions fasthttputil/pipeconns_test.go
Expand Up @@ -357,3 +357,51 @@ func testConcurrency(t *testing.T, concurrency int, f func(*testing.T)) {
}
}
}

func TestPipeConnsAddrDefault(t *testing.T) {
t.Parallel()

pc := NewPipeConns()
c1 := pc.Conn1()

if c1.LocalAddr() != pipeAddr(0) {
t.Fatalf("unexpected local address: %v", c1.LocalAddr())
}

if c1.RemoteAddr() != pipeAddr(0) {
t.Fatalf("unexpected remote address: %v", c1.RemoteAddr())
}
}

func TestPipeConnsAddrCustom(t *testing.T) {
t.Parallel()

pc := NewPipeConns()

addr1 := &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4), Port: 1234}
addr2 := &net.TCPAddr{IP: net.IPv4(5, 6, 7, 8), Port: 5678}
addr3 := &net.TCPAddr{IP: net.IPv4(9, 10, 11, 12), Port: 9012}
addr4 := &net.TCPAddr{IP: net.IPv4(13, 14, 15, 16), Port: 3456}

pc.SetAddresses(addr1, addr2, addr3, addr4)

c1 := pc.Conn1()

if c1.LocalAddr() != addr1 {
t.Fatalf("unexpected local address: %v", c1.LocalAddr())
}

if c1.RemoteAddr() != addr2 {
t.Fatalf("unexpected remote address: %v", c1.RemoteAddr())
}

c2 := pc.Conn1()

if c2.LocalAddr() != addr1 {
t.Fatalf("unexpected local address: %v", c2.LocalAddr())
}

if c2.RemoteAddr() != addr2 {
t.Fatalf("unexpected remote address: %v", c2.RemoteAddr())
}
}