Skip to content

Commit

Permalink
interim commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Oct 12, 2023
1 parent 8593351 commit 854b77c
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 101 deletions.
266 changes: 166 additions & 100 deletions p2p/test/transport/gating_test.go
Expand Up @@ -217,15 +217,13 @@ func TestInterceptAccept(t *testing.T) {
}
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
if strings.Contains(tc.Name, "WebRTCPrivate") {
testInterceptAcceptIncomingWebRTCPrivate(t, tc)
return
}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
connGater := NewMockConnectionGater(ctrl)
if strings.Contains(tc.Name, "WebRTCPrivate") {
connGater.EXPECT().InterceptPeerDial(gomock.Any()).Return(true)
connGater.EXPECT().InterceptAddrDial(gomock.Any(), gomock.Any()).Return(true)
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true)
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Return(true, control.DisconnectReason(0))
}

h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{ConnGater: connGater})
Expand All @@ -236,16 +234,7 @@ func TestInterceptAccept(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// The basic host dials the first connection.
if strings.Contains(tc.Name, "WebRTCPrivate") {

// relayed connection
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true)
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true)
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Return(true, control.DisconnectReason(0))

// TODO: Fix webrtc addresses on both sides.
connGater.EXPECT().InterceptAccept(gomock.Any())
} else if strings.Contains(tc.Name, "WebRTC") {
if strings.Contains(tc.Name, "WebRTC") {
// In WebRTC, retransmissions of the STUN packet might cause us to create multiple connections,
// if the first connection attempt is rejected.
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
Expand All @@ -271,136 +260,213 @@ func TestInterceptAccept(t *testing.T) {
}
}

func testInterceptAcceptIncomingWebRTCPrivate(t *testing.T, tc TransportTestCase) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
connGater := NewMockConnectionGater(ctrl)

// Relay reservation calls
connGater.EXPECT().InterceptPeerDial(gomock.Any()).Return(true)
connGater.EXPECT().InterceptAddrDial(gomock.Any(), gomock.Any()).Return(true)
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true)
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Return(true, control.DisconnectReason(0))

h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{ConnGater: connGater})
defer h1.Close()
defer h2.Close()
require.Len(t, h2.Addrs(), 1)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// relayed connection for incoming stream
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true)
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true)
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Return(true, control.DisconnectReason(0))
// webrtc connection accept
// TODO: Fix webrtc addresses on both sides.
connGater.EXPECT().InterceptAccept(gomock.Any())

// The basic host dials the first connection.
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), time.Hour)
_, err := h1.NewStream(ctx, h2.ID(), protocol.TestingID)
require.Error(t, err)
if _, err := h2.Addrs()[0].ValueForProtocol(ma.P_WEBRTC_DIRECT); err != nil {
// WebRTC rejects connection attempt before an error can be sent to the client.
// This means that the connection attempt will time out.
require.NotErrorIs(t, err, context.DeadlineExceeded)
}
}

func TestInterceptSecuredIncoming(t *testing.T) {
if race.WithRace() {
t.Skip("The upgrader spawns a new Go routine, which leads to race conditions when using GoMock.")
}
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})

if strings.Contains(tc.Name, "WebRTCPrivate") {
testInterceptSecuredIncomingWebRTCPrivate(t, tc)
return
}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
connGater := NewMockConnectionGater(ctrl)
if strings.Contains(tc.Name, "WebRTCPrivate") {
gomock.InOrder(
connGater.EXPECT().InterceptPeerDial(gomock.Any()).Return(true),
connGater.EXPECT().InterceptAddrDial(gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Return(true, control.DisconnectReason(0)),

// relayed connection
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Return(true, control.DisconnectReason(0)),

// TODO: Fix webrtc addresses on both sides.
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()),
)
}

h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{ConnGater: connGater})
defer h1.Close()
defer h2.Close()

require.Len(t, h2.Addrs(), 1)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if strings.Contains(tc.Name, "WebRTCPrivate") {
ctx = network.WithForceDirectDial(ctx, "transport integration test /webrtc")
} else {
gomock.InOrder(
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(network.DirInbound, h1.ID(), gomock.Any()).Do(func(_ network.Direction, _ peer.ID, addrs network.ConnMultiaddrs) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr())
}),
)
}

gomock.InOrder(
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(network.DirInbound, h1.ID(), gomock.Any()).Do(func(_ network.Direction, _ peer.ID, addrs network.ConnMultiaddrs) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr())

}),
)

h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), time.Hour)
_, err := h1.NewStream(ctx, h2.ID(), protocol.TestingID)
require.Error(t, err)

// WebRTCPrivate connection establishment is considered complete when the DTLS handshake finishes.
// At this point no SCTP association is established. Closing the connection on the listener side,
// immediately after accepting, closes the listener side of the connection before the SCTP association
// is established. Pion doesn't handle this case nicely and webrtc.PeerConnection on the dialer will
// only be considered closed once the ICE transport times out.
if !strings.Contains(tc.Name, "WebRTCPrivate") {
require.NotErrorIs(t, err, context.DeadlineExceeded)
}
require.NotErrorIs(t, err, context.DeadlineExceeded)
})
}
}

func testInterceptSecuredIncomingWebRTCPrivate(t *testing.T, tc TransportTestCase) {
t.Helper()

ctrl := gomock.NewController(t)
defer ctrl.Finish()
connGater := NewMockConnectionGater(ctrl)
if strings.Contains(tc.Name, "WebRTCPrivate") {
gomock.InOrder(
// Dial to relay node for circuit reservation
connGater.EXPECT().InterceptPeerDial(gomock.Any()).Return(true),
connGater.EXPECT().InterceptAddrDial(gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Return(true, control.DisconnectReason(0)),
// Incoming relay connection for signaling stream
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Return(true, control.DisconnectReason(0)),
)
}
h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{ConnGater: connGater})
defer h1.Close()
defer h2.Close()

require.Len(t, h2.Addrs(), 1)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
ctx = network.WithForceDirectDial(ctx, "transport integration test /webrtc")
gomock.InOrder(
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(network.DirInbound, h1.ID(), gomock.Any()),
)

h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), time.Hour)
_, err := h1.NewStream(ctx, h2.ID(), protocol.TestingID)
require.Error(t, err)

// Do not check that error is Context Deadline Exceeded
// WebRTCPrivate connection establishment is considered complete when the DTLS handshake finishes.
// At this point no SCTP association is established. Closing the connection on the listener side,
// immediately after accepting, closes the listener side of the connection before the SCTP association
// is established. Pion doesn't handle this case nicely and webrtc.PeerConnection on the dialer will
// only be considered closed once the ICE transport times out.
}

func TestInterceptUpgradedIncoming(t *testing.T) {
if race.WithRace() {
t.Skip("The upgrader spawns a new Go routine, which leads to race conditions when using GoMock.")
}
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
if strings.Contains(tc.Name, "WebRTCPrivate") {
testInterceptUpgradeIncomingWebRTCPrivate(t, tc)
return
}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
connGater := NewMockConnectionGater(ctrl)

h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})

if strings.Contains(tc.Name, "WebRTCPrivate") {
gomock.InOrder(
connGater.EXPECT().InterceptPeerDial(gomock.Any()).Return(true),
connGater.EXPECT().InterceptAddrDial(gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Return(true, control.DisconnectReason(0)),
)
}

h2 := tc.HostGenerator(t, TransportTestCaseOpts{ConnGater: connGater})
defer h1.Close()
defer h2.Close()

if strings.Contains(tc.Name, "WebRTCPrivate") {
gomock.InOrder(
// relayed connection
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Return(true, control.DisconnectReason(0)),

// TODO: Fix webrtc addresses on both sides.
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Do(func(c network.Conn) {
require.Equal(t, h1.ID(), c.RemotePeer())
require.Equal(t, h2.ID(), c.LocalPeer())
}),
)
}
require.Len(t, h2.Addrs(), 1)

gomock.InOrder(
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(network.DirInbound, h1.ID(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Do(func(c network.Conn) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), c.LocalMultiaddr())

require.Equal(t, h1.ID(), c.RemotePeer())
require.Equal(t, h2.ID(), c.LocalPeer())
}),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if strings.Contains(tc.Name, "WebRTCPrivate") {
ctx = network.WithForceDirectDial(ctx, "transport integration test /webrtc")
} else {
gomock.InOrder(
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(network.DirInbound, h1.ID(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Do(func(c network.Conn) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), c.LocalMultiaddr())
require.Equal(t, h1.ID(), c.RemotePeer())
require.Equal(t, h2.ID(), c.LocalPeer())
}),
)
}
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), time.Hour)
_, err := h1.NewStream(ctx, h2.ID(), protocol.TestingID)
require.Error(t, err)

// see comment at the corresponding line in `TestInterceptUpgradedIncoming`
if !strings.Contains(tc.Name, "WebRTCPrivate") {
require.NotErrorIs(t, err, context.DeadlineExceeded)
}
require.NotErrorIs(t, err, context.DeadlineExceeded)
})
}
}

func testInterceptUpgradeIncomingWebRTCPrivate(t *testing.T, tc TransportTestCase) {
t.Helper()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
connGater := NewMockConnectionGater(ctrl)

gomock.InOrder(
// Dial to relay node for circuit reservation
connGater.EXPECT().InterceptPeerDial(gomock.Any()).Return(true),
connGater.EXPECT().InterceptAddrDial(gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Return(true, control.DisconnectReason(0)),
// Incoming relay connection for signaling stream
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Return(true, control.DisconnectReason(0)),
)

h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{ConnGater: connGater})
defer h1.Close()
defer h2.Close()

require.Len(t, h2.Addrs(), 1)

gomock.InOrder(
connGater.EXPECT().InterceptAccept(gomock.Any()).Return(true),
connGater.EXPECT().InterceptSecured(network.DirInbound, h1.ID(), gomock.Any()).Return(true),
connGater.EXPECT().InterceptUpgraded(gomock.Any()).Do(func(c network.Conn) {
require.Equal(t, h1.ID(), c.RemotePeer())
require.Equal(t, h2.ID(), c.LocalPeer())
}),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
ctx = network.WithForceDirectDial(ctx, "transport integration test /webrtc")

h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), time.Hour)
_, err := h1.NewStream(ctx, h2.ID(), protocol.TestingID)
require.Error(t, err)
}
2 changes: 1 addition & 1 deletion p2p/transport/webrtcprivate/transport.go
Expand Up @@ -39,7 +39,7 @@ const (
disconnectedTimeout = 20 * time.Second
failedTimeout = 30 * time.Second
keepaliveTimeout = 15 * time.Second
maxAcceptQueueLen = 10
maxAcceptQueueLen = 256
)

var (
Expand Down

0 comments on commit 854b77c

Please sign in to comment.