Skip to content

Commit

Permalink
migrate to consolidated types (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed May 26, 2019
1 parent 172dc16 commit e92179b
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 56 deletions.
11 changes: 6 additions & 5 deletions p2p/protocol/internal/circuitv1-deprecated/conn.go
Expand Up @@ -5,16 +5,17 @@ import (
"net"
"time"

host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
pstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

type Conn struct {
stream inet.Stream
remote pstore.PeerInfo
stream network.Stream
remote peer.AddrInfo
host host.Host
}

Expand Down
17 changes: 8 additions & 9 deletions p2p/protocol/internal/circuitv1-deprecated/dial.go
Expand Up @@ -5,13 +5,12 @@ import (
"fmt"
"math/rand"

peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
tpt "github.com/libp2p/go-libp2p-transport"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
)

func (d *RelayTransport) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (tpt.Conn, error) {
func (d *RelayTransport) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
c, err := d.Relay().Dial(ctx, a, p)
if err != nil {
return nil, err
Expand All @@ -37,7 +36,7 @@ func (r *Relay) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (*Conn, err
}
}

dinfo := &pstore.PeerInfo{ID: p, Addrs: []ma.Multiaddr{}}
dinfo := &peer.AddrInfo{ID: p, Addrs: []ma.Multiaddr{}}
if len(destaddr.Bytes()) > 0 {
dinfo.Addrs = append(dinfo.Addrs, destaddr)
}
Expand All @@ -47,16 +46,16 @@ func (r *Relay) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (*Conn, err
return r.tryDialRelays(ctx, *dinfo)
}

var rinfo *pstore.PeerInfo
rinfo, err := pstore.InfoFromP2pAddr(relayaddr)
var rinfo *peer.AddrInfo
rinfo, err := peer.AddrInfoFromP2pAddr(relayaddr)
if err != nil {
return nil, fmt.Errorf("error parsing multiaddr '%s': %s", relayaddr.String(), err)
}

return r.DialPeer(ctx, *rinfo, *dinfo)
}

func (r *Relay) tryDialRelays(ctx context.Context, dinfo pstore.PeerInfo) (*Conn, error) {
func (r *Relay) tryDialRelays(ctx context.Context, dinfo peer.AddrInfo) (*Conn, error) {
var relays []peer.ID
r.mx.Lock()
for p := range r.relays {
Expand All @@ -76,7 +75,7 @@ func (r *Relay) tryDialRelays(ctx context.Context, dinfo pstore.PeerInfo) (*Conn
}

rctx, cancel := context.WithTimeout(ctx, HopConnectTimeout)
c, err := r.DialPeer(rctx, pstore.PeerInfo{ID: relay}, dinfo)
c, err := r.DialPeer(rctx, peer.AddrInfo{ID: relay}, dinfo)
cancel()

if err == nil {
Expand Down
4 changes: 2 additions & 2 deletions p2p/protocol/internal/circuitv1-deprecated/notify.go
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"time"

inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
inet "github.com/libp2p/go-libp2p-core/network"
peer "github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)

Expand Down
43 changes: 23 additions & 20 deletions p2p/protocol/internal/circuitv1-deprecated/relay.go
Expand Up @@ -10,13 +10,16 @@ import (

pb "github.com/libp2p/go-libp2p-circuit/pb"

logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"

pool "github.com/libp2p/go-buffer-pool"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"

logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
)

Expand Down Expand Up @@ -138,12 +141,12 @@ func (r *Relay) GetActiveHops() int32 {
return atomic.LoadInt32(&r.liveHopCount)
}

func (r *Relay) DialPeer(ctx context.Context, relay pstore.PeerInfo, dest pstore.PeerInfo) (*Conn, error) {
func (r *Relay) DialPeer(ctx context.Context, relay peer.AddrInfo, dest peer.AddrInfo) (*Conn, error) {

log.Debugf("dialing peer %s through relay %s", dest.ID, relay.ID)

if len(relay.Addrs) > 0 {
r.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, pstore.TempAddrTTL)
r.host.Peerstore().AddAddrs(relay.ID, relay.Addrs, peerstore.TempAddrTTL)
}

s, err := r.host.NewStream(ctx, relay.ID, ProtoID)
Expand Down Expand Up @@ -219,7 +222,7 @@ func (r *Relay) CanHop(ctx context.Context, id peer.ID) (bool, error) {
s.Reset()
return false, err
}
if err := inet.FullClose(s); err != nil {
if err := helpers.FullClose(s); err != nil {
return false, err
}

Expand All @@ -230,7 +233,7 @@ func (r *Relay) CanHop(ctx context.Context, id peer.ID) (bool, error) {
return msg.GetCode() == pb.CircuitRelay_SUCCESS, nil
}

func (r *Relay) handleNewStream(s inet.Stream) {
func (r *Relay) handleNewStream(s network.Stream) {
log.Infof("new relay stream from: %s", s.Conn().RemotePeer())

rd := newDelimitedReader(s, maxMessageSize)
Expand All @@ -257,7 +260,7 @@ func (r *Relay) handleNewStream(s inet.Stream) {
}
}

func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) {
func (r *Relay) handleHopStream(s network.Stream, msg *pb.CircuitRelay) {
if !r.hop {
r.handleError(s, pb.CircuitRelay_HOP_CANT_SPEAK_RELAY)
return
Expand Down Expand Up @@ -300,15 +303,15 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) {
defer cancel()

if !r.active {
ctx = inet.WithNoDial(ctx, "relay hop")
ctx = network.WithNoDial(ctx, "relay hop")
} else if len(dst.Addrs) > 0 {
r.host.Peerstore().AddAddrs(dst.ID, dst.Addrs, pstore.TempAddrTTL)
r.host.Peerstore().AddAddrs(dst.ID, dst.Addrs, peerstore.TempAddrTTL)
}

bs, err := r.host.NewStream(ctx, dst.ID, ProtoID)
if err != nil {
log.Debugf("error opening relay stream to %s: %s", dst.ID.Pretty(), err.Error())
if err == inet.ErrNoConn {
if err == network.ErrNoConn {
r.handleError(s, pb.CircuitRelay_HOP_NO_CONN_TO_DST)
} else {
r.handleError(s, pb.CircuitRelay_HOP_CANT_DIAL_DST)
Expand Down Expand Up @@ -423,7 +426,7 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) {
}()
}

func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) {
func (r *Relay) handleStopStream(s network.Stream, msg *pb.CircuitRelay) {
src, err := peerToPeerInfo(msg.GetSrcPeer())
if err != nil {
r.handleError(s, pb.CircuitRelay_STOP_SRC_MULTIADDR_INVALID)
Expand All @@ -439,7 +442,7 @@ func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) {
log.Infof("relay connection from: %s", src.ID)

if len(src.Addrs) > 0 {
r.host.Peerstore().AddAddrs(src.ID, src.Addrs, pstore.TempAddrTTL)
r.host.Peerstore().AddAddrs(src.ID, src.Addrs, peerstore.TempAddrTTL)
}

select {
Expand All @@ -449,7 +452,7 @@ func (r *Relay) handleStopStream(s inet.Stream, msg *pb.CircuitRelay) {
}
}

func (r *Relay) handleCanHop(s inet.Stream, msg *pb.CircuitRelay) {
func (r *Relay) handleCanHop(s network.Stream, msg *pb.CircuitRelay) {
var err error

if r.hop {
Expand All @@ -462,22 +465,22 @@ func (r *Relay) handleCanHop(s inet.Stream, msg *pb.CircuitRelay) {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
inet.FullClose(s)
helpers.FullClose(s)
}
}

func (r *Relay) handleError(s inet.Stream, code pb.CircuitRelay_Status) {
func (r *Relay) handleError(s network.Stream, code pb.CircuitRelay_Status) {
log.Warningf("relay error: %s (%d)", pb.CircuitRelay_Status_name[int32(code)], code)
err := r.writeResponse(s, code)
if err != nil {
s.Reset()
log.Debugf("error writing relay response: %s", err.Error())
} else {
inet.FullClose(s)
helpers.FullClose(s)
}
}

func (r *Relay) writeResponse(s inet.Stream, code pb.CircuitRelay_Status) error {
func (r *Relay) writeResponse(s network.Stream, code pb.CircuitRelay_Status) error {
wr := newDelimitedWriter(s)

var msg pb.CircuitRelay
Expand Down
3 changes: 2 additions & 1 deletion p2p/protocol/internal/circuitv1-deprecated/relay_test.go
Expand Up @@ -14,7 +14,8 @@ import (
pb "github.com/libp2p/go-libp2p-circuit/pb"

bhost "github.com/libp2p/go-libp2p-blankhost"
host "github.com/libp2p/go-libp2p-host"
"github.com/libp2p/go-libp2p-core/host"

swarm "github.com/libp2p/go-libp2p-swarm"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
ma "github.com/multiformats/go-multiaddr"
Expand Down
11 changes: 6 additions & 5 deletions p2p/protocol/internal/circuitv1-deprecated/transport.go
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"fmt"

host "github.com/libp2p/go-libp2p-host"
tpt "github.com/libp2p/go-libp2p-transport"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/transport"

tptu "github.com/libp2p/go-libp2p-transport-upgrader"
ma "github.com/multiformats/go-multiaddr"
)
Expand All @@ -23,7 +24,7 @@ func init() {
ma.AddProtocol(Protocol)
}

var _ tpt.Transport = (*RelayTransport)(nil)
var _ transport.Transport = (*RelayTransport)(nil)

type RelayTransport Relay

Expand All @@ -35,7 +36,7 @@ func (r *Relay) Transport() *RelayTransport {
return (*RelayTransport)(r)
}

func (t *RelayTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) {
func (t *RelayTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
// TODO: Ensure we have a connection to the relay, if specified. Also,
// make sure the multiaddr makes sense.
if !t.Relay().Matches(laddr) {
Expand All @@ -58,7 +59,7 @@ func (t *RelayTransport) Protocols() []int {

// AddRelayTransport constructs a relay and adds it as a transport to the host network.
func AddRelayTransport(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts ...RelayOpt) error {
n, ok := h.Network().(tpt.Network)
n, ok := h.Network().(transport.TransportNetwork)
if !ok {
return fmt.Errorf("%v is not a transport network", h.Network())
}
Expand Down
15 changes: 8 additions & 7 deletions p2p/protocol/internal/circuitv1-deprecated/transport_test.go
Expand Up @@ -10,9 +10,10 @@ import (

. "github.com/libp2p/go-libp2p-circuit"

host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
pstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peerstore"

swarm "github.com/libp2p/go-libp2p-swarm"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -45,7 +46,7 @@ func testSetupRelay(t *testing.T, ctx context.Context) []host.Host {

time.Sleep(100 * time.Millisecond)

handler := func(s inet.Stream) {
handler := func(s network.Stream) {
_, err := s.Write(msg)
if err != nil {
t.Error(err)
Expand All @@ -72,7 +73,7 @@ func TestFullAddressTransportDial(t *testing.T) {
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()

hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, pstore.TempAddrTTL)
hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL)

s, err := hosts[0].NewStream(rctx, hosts[2].ID(), TestProto)
if err != nil {
Expand Down Expand Up @@ -103,7 +104,7 @@ func TestSpecificRelayTransportDial(t *testing.T) {
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()

hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, pstore.TempAddrTTL)
hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL)

s, err := hosts[0].NewStream(rctx, hosts[2].ID(), TestProto)
if err != nil {
Expand Down Expand Up @@ -134,7 +135,7 @@ func TestUnspecificRelayTransportDial(t *testing.T) {
rctx, rcancel := context.WithTimeout(ctx, time.Second)
defer rcancel()

hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, pstore.TempAddrTTL)
hosts[0].Peerstore().AddAddrs(hosts[2].ID(), []ma.Multiaddr{addr}, peerstore.TempAddrTTL)

s, err := hosts[0].NewStream(rctx, hosts[2].ID(), TestProto)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions p2p/protocol/internal/circuitv1-deprecated/util.go
Expand Up @@ -7,22 +7,22 @@ import (

pb "github.com/libp2p/go-libp2p-circuit/pb"

"github.com/libp2p/go-libp2p-core/peer"

ggio "github.com/gogo/protobuf/io"
proto "github.com/gogo/protobuf/proto"
pool "github.com/libp2p/go-buffer-pool"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)

func peerToPeerInfo(p *pb.CircuitRelay_Peer) (pstore.PeerInfo, error) {
func peerToPeerInfo(p *pb.CircuitRelay_Peer) (peer.AddrInfo, error) {
if p == nil {
return pstore.PeerInfo{}, errors.New("nil peer")
return peer.AddrInfo{}, errors.New("nil peer")
}

id, err := peer.IDFromBytes(p.Id)
if err != nil {
return pstore.PeerInfo{}, err
return peer.AddrInfo{}, err
}

addrs := make([]ma.Multiaddr, 0, len(p.Addrs))
Expand All @@ -33,10 +33,10 @@ func peerToPeerInfo(p *pb.CircuitRelay_Peer) (pstore.PeerInfo, error) {
}
}

return pstore.PeerInfo{ID: id, Addrs: addrs}, nil
return peer.AddrInfo{ID: id, Addrs: addrs}, nil
}

func peerInfoToPeer(pi pstore.PeerInfo) *pb.CircuitRelay_Peer {
func peerInfoToPeer(pi peer.AddrInfo) *pb.CircuitRelay_Peer {
addrs := make([][]byte, len(pi.Addrs))
for i, addr := range pi.Addrs {
addrs[i] = addr.Bytes()
Expand Down

0 comments on commit e92179b

Please sign in to comment.