Skip to content

Commit

Permalink
Merge pull request #76 from libp2p/fix/stream-close
Browse files Browse the repository at this point in the history
call Stream.Reset instead of Stream.Close
  • Loading branch information
Stebalien committed May 22, 2019
2 parents 7c12b6d + c5068bb commit 172dc16
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 35 deletions.
52 changes: 47 additions & 5 deletions p2p/protocol/internal/circuitv1-deprecated/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ package relay
import (
"fmt"
"net"
"time"

host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

type Conn struct {
inet.Stream
stream inet.Stream
remote pstore.PeerInfo
host host.Host
}

type NetAddr struct {
Expand All @@ -28,29 +31,68 @@ func (n *NetAddr) String() string {
return fmt.Sprintf("relay[%s-%s]", n.Remote, n.Relay)
}

func (c *Conn) Close() error {
c.untagHop()
return c.stream.Reset()
}

func (c *Conn) Read(buf []byte) (int, error) {
return c.stream.Read(buf)
}

func (c *Conn) Write(buf []byte) (int, error) {
return c.stream.Write(buf)
}

func (c *Conn) SetDeadline(t time.Time) error {
return c.stream.SetDeadline(t)
}

func (c *Conn) SetReadDeadline(t time.Time) error {
return c.stream.SetReadDeadline(t)
}

func (c *Conn) SetWriteDeadline(t time.Time) error {
return c.stream.SetWriteDeadline(t)
}

func (c *Conn) RemoteAddr() net.Addr {
return &NetAddr{
Relay: c.Conn().RemotePeer().Pretty(),
Relay: c.stream.Conn().RemotePeer().Pretty(),
Remote: c.remote.ID.Pretty(),
}
}

// Increment the underlying relay connection tag by 1, thus increasing its protection from
// connection pruning. This ensures that connections to relays are not accidentally closed,
// by the connection manager, taking with them all the relayed connections (that may themselves
// be protected).
func (c *Conn) tagHop() {
c.host.ConnManager().UpsertTag(c.stream.Conn().RemotePeer(), "relay-hop-stream", incrementTag)
}

// Decrement the underlying relay connection tag by 1; this is performed when we close the
// relayed connection.
func (c *Conn) untagHop() {
c.host.ConnManager().UpsertTag(c.stream.Conn().RemotePeer(), "relay-hop-stream", decrementTag)
}

// TODO: is it okay to cast c.Conn().RemotePeer() into a multiaddr? might be "user input"
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
proto := ma.ProtocolWithCode(ma.P_P2P).Name
peerid := c.Conn().RemotePeer().Pretty()
peerid := c.stream.Conn().RemotePeer().Pretty()
p2paddr := ma.StringCast(fmt.Sprintf("/%s/%s", proto, peerid))

circaddr := ma.Cast(ma.CodeToVarint(P_CIRCUIT))
return p2paddr.Encapsulate(circaddr)
}

func (c *Conn) LocalMultiaddr() ma.Multiaddr {
return c.Conn().LocalMultiaddr()
return c.stream.Conn().LocalMultiaddr()
}

func (c *Conn) LocalAddr() net.Addr {
na, err := manet.ToNetAddr(c.Conn().LocalMultiaddr())
na, err := manet.ToNetAddr(c.stream.Conn().LocalMultiaddr())
if err != nil {
log.Error("failed to convert local multiaddr to net addr:", err)
return nil
Expand Down
1 change: 1 addition & 0 deletions p2p/protocol/internal/circuitv1-deprecated/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func (d *RelayTransport) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (t
if err != nil {
return nil, err
}
c.tagHop()
return d.upgrader.UpgradeOutbound(ctx, d, c, p)
}

Expand Down
5 changes: 3 additions & 2 deletions p2p/protocol/internal/circuitv1-deprecated/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ func (r *Relay) Listener() *RelayListener {
func (l *RelayListener) Accept() (manet.Conn, error) {
select {
case c := <-l.incoming:
err := l.Relay().writeResponse(c.Stream, pb.CircuitRelay_SUCCESS)
err := l.Relay().writeResponse(c.stream, pb.CircuitRelay_SUCCESS)
if err != nil {
log.Debugf("error writing relay response: %s", err.Error())
c.Stream.Reset()
c.stream.Reset()
return nil, err
}

// TODO: Pretty print.
log.Infof("accepted relay connection: %q", c)

c.tagHop()
return c, nil
case <-l.ctx.Done():
return nil, l.ctx.Err()
Expand Down
17 changes: 11 additions & 6 deletions p2p/protocol/internal/circuitv1-deprecated/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,21 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts ..
return r, nil
}

// Increment the live hop count and increment the connection manager tags by 1 for the two
// sides of the hop stream. This ensures that connections with many hop streams will be protected
// from pruning, thus minimizing disruption from connection trimming in a relay node.
func (r *Relay) addLiveHop(from, to peer.ID) {
atomic.AddInt32(&r.liveHopCount, 1)
r.host.ConnManager().UpsertTag(from, "relay-hop-stream", func(v int) int { return v + 1 })
r.host.ConnManager().UpsertTag(to, "relay-hop-stream", func(v int) int { return v + 1 })
r.host.ConnManager().UpsertTag(from, "relay-hop-stream", incrementTag)
r.host.ConnManager().UpsertTag(to, "relay-hop-stream", incrementTag)
}

// Decrement the live hpo count and decrement the connection manager tags for the two sides
// of the hop stream.
func (r *Relay) rmLiveHop(from, to peer.ID) {
atomic.AddInt32(&r.liveHopCount, -1)
r.host.ConnManager().UpsertTag(from, "relay-hop-stream", func(v int) int { return v - 1 })
r.host.ConnManager().UpsertTag(to, "relay-hop-stream", func(v int) int { return v - 1 })
r.host.ConnManager().UpsertTag(from, "relay-hop-stream", decrementTag)
r.host.ConnManager().UpsertTag(to, "relay-hop-stream", decrementTag)

}

Expand Down Expand Up @@ -180,7 +185,7 @@ func (r *Relay) DialPeer(ctx context.Context, relay pstore.PeerInfo, dest pstore
return nil, RelayError{msg.GetCode()}
}

return &Conn{Stream: s, remote: dest}, nil
return &Conn{stream: s, remote: dest, host: r.host}, nil
}

func (r *Relay) Matches(addr ma.Multiaddr) bool {
Expand Down Expand Up @@ -438,7 +443,7 @@ func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) {
}

select {
case r.incoming <- &Conn{Stream: s, remote: src}:
case r.incoming <- &Conn{stream: s, remote: src, host: r.host}:
case <-time.After(RelayAcceptTimeout):
r.handleError(s, pb.CircuitRelay_STOP_RELAY_REFUSED)
}
Expand Down

0 comments on commit 172dc16

Please sign in to comment.