Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update go-data-transfer v2 version #740

Open
wants to merge 1 commit into
base: feat/datatransfer-v2-integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Expand Up @@ -9,7 +9,7 @@ require (
github.com/filecoin-project/go-cbor-util v0.0.1
github.com/filecoin-project/go-commp-utils v0.1.3
github.com/filecoin-project/go-crypto v0.0.1 // indirect
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-20220603004528-681bfedccef1
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-20220731093701-aad22878e865
github.com/filecoin-project/go-ds-versioning v0.1.1
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
Expand All @@ -31,7 +31,7 @@ require (
github.com/ipfs/go-cidutil v0.1.0
github.com/ipfs/go-datastore v0.5.1
github.com/ipfs/go-filestore v1.1.0
github.com/ipfs/go-graphsync v0.13.1
github.com/ipfs/go-graphsync v0.13.3-0.20220625074430-a95496cf1534
github.com/ipfs/go-ipfs-blockstore v1.1.2
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Expand Up @@ -240,6 +240,8 @@ github.com/filecoin-project/go-data-transfer v1.14.0 h1:4pnfJk8FYtqcdAg+QRGzaz57
github.com/filecoin-project/go-data-transfer v1.14.0/go.mod h1:wNJKhaLLYBJDM3VFvgvYi4iUjPa69pz/1Q5Q4HzX2wE=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-20220603004528-681bfedccef1 h1:OeqAIz4wyLdkmjQbxSEEmj3tUA6fcH4vwmCXLMm50Mg=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-20220603004528-681bfedccef1/go.mod h1:CKpMVsGd15In1HPfW24jFK/TIf+98iFPQ0qh1LcxyB8=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-20220731093701-aad22878e865 h1:gFSieS2c4yinh9YB9SasmQXWYpyfoXImEXnZwSWs19c=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-20220731093701-aad22878e865/go.mod h1:MWO2J4e4mZEa1GvVyAieSQjyWAdg3CsuVhQfkQmDXMY=
github.com/filecoin-project/go-ds-versioning v0.0.0-20211206185234-508abd7c2aff/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4=
github.com/filecoin-project/go-ds-versioning v0.1.1 h1:JiyBqaQlwC+UM0WhcBtVEeT3XrX59mQhT8U3p7nu86o=
github.com/filecoin-project/go-ds-versioning v0.1.1/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4=
Expand Down Expand Up @@ -580,6 +582,8 @@ github.com/ipfs/go-graphsync v0.11.0/go.mod h1:wC+c8vGVjAHthsVIl8LKr37cUra2GOaMY
github.com/ipfs/go-graphsync v0.12.0/go.mod h1:nASYWYETgsnMbQ3+DirNImOHQ8TY0a5AhAqyOY55tUg=
github.com/ipfs/go-graphsync v0.13.1 h1:lWiP/WLycoPUYyj3IDEi1GJNP30kFuYOvimcfeuZyQs=
github.com/ipfs/go-graphsync v0.13.1/go.mod h1:y8e8G6CmZeL9Srvx1l15CtGiRdf3h5JdQuqPz/iYL0A=
github.com/ipfs/go-graphsync v0.13.3-0.20220625074430-a95496cf1534 h1:sn7viAPyx3qZVhfRpXhW23mPtzl9rjJKtJ/HM/HsyZg=
github.com/ipfs/go-graphsync v0.13.3-0.20220625074430-a95496cf1534/go.mod h1:RKAui2+/HmlIVnuAXJIn0jltvOAXkl7wz3SYysmYnPI=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs v0.11.0/go.mod h1:g68Thu2Ho11AWoHsN34P5fSK7iA6OWWRy3T/g8HLixc=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
Expand Down
19 changes: 4 additions & 15 deletions retrievalmarket/impl/dtutils/dtutils.go
Expand Up @@ -65,11 +65,7 @@ func providerEvent(event datatransfer.Event, channelState datatransfer.ChannelSt
// event or moving to error if a data transfer error occurs
func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
voucher, err := channelState.Voucher()
if err != nil {
log.Errorf("received bad voucher: %s", err.Error())
return
}
voucher := channelState.Voucher()
if voucher.Voucher == nil {
log.Errorf("received empty voucher")
return
Expand Down Expand Up @@ -126,11 +122,7 @@ func clientEvent(event datatransfer.Event, channelState datatransfer.ChannelStat
case datatransfer.Cancel:
return rm.ClientEventProviderCancelled, nil
case datatransfer.NewVoucherResult:
voucher, err := channelState.LastVoucherResult()
if err != nil {
log.Errorf("received bad voucher result: %s", err.Error())
return noEvent, nil
}
voucher := channelState.LastVoucherResult()
response, err := rm.DealResponseFromNode(voucher.Voucher)
if err != nil {
log.Errorf("unexpected voucher result received: %s", err.Error())
Expand All @@ -157,11 +149,8 @@ func clientEvent(event datatransfer.Event, channelState datatransfer.ChannelStat
// an event to the appropriate state machine
func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
voucher, err := channelState.Voucher()
if err != nil {
log.Errorf("received bad voucher: %s", err.Error())
return
}
voucher := channelState.Voucher()

dealProposal, err := rm.DealProposalFromNode(voucher.Voucher)
// if this event is for a transfer not related to retrieval, ignore
if err != nil {
Expand Down
24 changes: 22 additions & 2 deletions retrievalmarket/impl/dtutils/dtutils_test.go
Expand Up @@ -9,7 +9,6 @@ import (
ds "github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -411,10 +410,31 @@ type fakeTransport struct{}

var _ datatransfer.Transport = (*fakeTransport)(nil)

func (ft *fakeTransport) OpenChannel(ctx context.Context, dataSender peer.ID, channelID datatransfer.ChannelID, root datamodel.Link, stor datamodel.Node, channel datatransfer.ChannelState, msg datatransfer.Message) error {
func (ft *fakeTransport) Capabilities() datatransfer.TransportCapabilities {
return datatransfer.TransportCapabilities{}
}

func (ft *fakeTransport) ID() datatransfer.TransportID {
return ""
}

func (ft *fakeTransport) Versions() []datatransfer.Version {
return nil
}

func (ft *fakeTransport) SendMessage(ctx context.Context, chid datatransfer.ChannelID, msg datatransfer.Message) error {
return nil
}

func (ft *fakeTransport) ChannelUpdated(ctx context.Context, chid datatransfer.ChannelID, message datatransfer.Message) error {
return nil
}
func (ft *fakeTransport) OpenChannel(context.Context, datatransfer.Channel, datatransfer.Request) error {
return nil
}
func (ft *fakeTransport) RestartChannel(ctx context.Context, channel datatransfer.ChannelState, req datatransfer.Request) error {
return nil
}
func (ft *fakeTransport) CloseChannel(ctx context.Context, chid datatransfer.ChannelID) error {
return nil
}
Expand Down
16 changes: 8 additions & 8 deletions retrievalmarket/impl/integration_test.go
Expand Up @@ -118,8 +118,8 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
})

gs1 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(testData.Host1), testData.LinkSystem1)
dtTransport1 := dtgstransport.NewTransport(testData.Host1.ID(), gs1)
dt1, err := dtimpl.NewDataTransfer(testData.DTStore1, testData.DTNet1, dtTransport1)
dtTransport1 := dtgstransport.NewTransport(gs1, testData.DTNet1)
dt1, err := dtimpl.NewDataTransfer(testData.DTStore1, testData.Host1.ID(), dtTransport1)
require.NoError(t, err)
tut.StartAndWaitForReadyDT(ctx, t, dt1)
require.NoError(t, err)
Expand Down Expand Up @@ -162,8 +162,8 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
paymentAddress := address.TestAddress2

gs2 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(testData.Host2), testData.LinkSystem2)
dtTransport2 := dtgstransport.NewTransport(testData.Host2.ID(), gs2)
dt2, err := dtimpl.NewDataTransfer(testData.DTStore2, testData.DTNet2, dtTransport2)
dtTransport2 := dtgstransport.NewTransport(gs2, testData.DTNet2)
dt2, err := dtimpl.NewDataTransfer(testData.DTStore2, testData.Host2.ID(), dtTransport2)
require.NoError(t, err)
tut.StartAndWaitForReadyDT(ctx, t, dt2)
require.NoError(t, err)
Expand Down Expand Up @@ -643,8 +643,8 @@ func setupClient(
})

gs1 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(testData.Host1), testData.LinkSystem1)
dtTransport1 := dtgstransport.NewTransport(testData.Host1.ID(), gs1)
dt1, err := dtimpl.NewDataTransfer(testData.DTStore1, testData.DTNet1, dtTransport1)
dtTransport1 := dtgstransport.NewTransport(gs1, testData.DTNet1)
dt1, err := dtimpl.NewDataTransfer(testData.DTStore1, testData.Host1.ID(), dtTransport1)
require.NoError(t, err)
tut.StartAndWaitForReadyDT(ctx, t, dt1)
require.NoError(t, err)
Expand Down Expand Up @@ -682,8 +682,8 @@ func setupProvider(
pieceStore.ExpectPiece(expectedPiece, pieceInfo)

gs2 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(testData.Host2), testData.LinkSystem2)
dtTransport2 := dtgstransport.NewTransport(testData.Host2.ID(), gs2)
dt2, err := dtimpl.NewDataTransfer(testData.DTStore2, testData.DTNet2, dtTransport2)
dtTransport2 := dtgstransport.NewTransport(gs2, testData.DTNet2)
dt2, err := dtimpl.NewDataTransfer(testData.DTStore2, testData.Host2.ID(), dtTransport2)
require.NoError(t, err)
tut.StartAndWaitForReadyDT(ctx, t, dt2)
require.NoError(t, err)
Expand Down
5 changes: 1 addition & 4 deletions retrievalmarket/impl/providerstates/provider_states.go
Expand Up @@ -159,10 +159,7 @@ func savePayment(ctx fsm.Context, env ProviderDealEnvironment, payment *rm.DealP
}

func processLastVoucher(ctx fsm.Context, env ProviderDealEnvironment, channelState datatransfer.ChannelState) (abi.TokenAmount, error) {
voucher, err := channelState.LastVoucher()
if err != nil {
return abi.TokenAmount{}, err
}
voucher := channelState.LastVoucher()

// read payment and return response if present
if payment, err := rm.DealPaymentFromNode(voucher.Voucher); err == nil {
Expand Down
6 changes: 2 additions & 4 deletions retrievalmarket/impl/requestvalidation/requestvalidation.go
Expand Up @@ -165,10 +165,8 @@ func (rv *ProviderRequestValidator) validatePull(receiver peer.ID, proposal *rm.

// ValidateRestart validates a request on restart, based on its current state
func (rv *ProviderRequestValidator) ValidateRestart(channelID datatransfer.ChannelID, channelState datatransfer.ChannelState) (datatransfer.ValidationResult, error) {
voucher, err := channelState.Voucher()
if err != nil {
return datatransfer.ValidationResult{}, err
}
voucher := channelState.Voucher()

proposal, err := rm.DealProposalFromNode(voucher.Voucher)
if err != nil {
return datatransfer.ValidationResult{}, errors.New("wrong voucher type")
Expand Down
11 changes: 6 additions & 5 deletions retrievalmarket/retrieval_restart_integration_test.go
Expand Up @@ -7,12 +7,13 @@ import (

"github.com/ipfs/go-datastore"
logger "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"

datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/channelmonitor"
dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl"
dtnet "github.com/filecoin-project/go-data-transfer/v2/network"
dtnet "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"

Expand Down Expand Up @@ -97,8 +98,8 @@ func TestBounceConnectionDealTransferOngoing(t *testing.T) {
td := shared_testutil.NewLibp2pTestData(bgCtx, t)
td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry)
depGen := dependencies.NewDepGenerator()
depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport) (datatransfer.Manager, error) {
return dtimpl.NewDataTransfer(ds, transferNetwork, transport, restartConf)
depGen.ClientNewDataTransfer = func(ds datastore.Batching, peerID peer.ID, transport datatransfer.Transport) (datatransfer.Manager, error) {
return dtimpl.NewDataTransfer(ds, peerID, transport, restartConf)
}
deps := depGen.New(t, bgCtx, td, testnodes.NewStorageMarketState(), "", noOpDelay, noOpDelay)
providerNode := testnodes2.NewTestRetrievalProviderNode()
Expand Down Expand Up @@ -227,8 +228,8 @@ func TestBounceConnectionDealTransferUnsealing(t *testing.T) {
td := shared_testutil.NewLibp2pTestData(bgCtx, t)
td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry)
depGen := dependencies.NewDepGenerator()
depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport) (datatransfer.Manager, error) {
return dtimpl.NewDataTransfer(ds, transferNetwork, transport, restartConf)
depGen.ClientNewDataTransfer = func(ds datastore.Batching, peerID peer.ID, transport datatransfer.Transport) (datatransfer.Manager, error) {
return dtimpl.NewDataTransfer(ds, peerID, transport, restartConf)
}
deps := depGen.New(t, bgCtx, td, testnodes.NewStorageMarketState(), "", noOpDelay, noOpDelay)
providerNode := testnodes2.NewTestRetrievalProviderNode()
Expand Down
2 changes: 1 addition & 1 deletion shared_testutil/mocknet.go
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/net/context"

dtnet "github.com/filecoin-project/go-data-transfer/v2/network"
dtnet "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network"

"github.com/filecoin-project/go-fil-markets/shared_testutil/unixfs"
)
Expand Down
46 changes: 21 additions & 25 deletions shared_testutil/testchannel.go
Expand Up @@ -112,12 +112,8 @@ func NewTestChannel(params TestChannelParams) datatransfer.ChannelState {
return tc
}

func (tc *TestChannel) ReceivedCidsLen() int {
return len(tc.receivedCids)
}

func (tc *TestChannel) ReceivedCidsTotal() int64 {
return int64(len(tc.receivedCids))
func (tc *TestChannel) ReceivedIndex() datamodel.Node {
return basicnode.NewInt(int64(len(tc.receivedCids)))
}

// TransferID returns the transfer id for this channel
Expand All @@ -136,27 +132,22 @@ func (tc *TestChannel) Selector() datamodel.Node {
return tc.selector
}

// ReceivedCids returns the cids received so far
func (tc *TestChannel) ReceivedCids() []cid.Cid {
return tc.receivedCids
}

// TODO actual implementation of those
func (tc *TestChannel) MissingCids() []cid.Cid {
return nil
}

func (tc *TestChannel) QueuedCidsTotal() int64 {
return 0
func (tc *TestChannel) QueuedIndex() datamodel.Node {
return basicnode.NewInt(0)
}

func (tc *TestChannel) SentCidsTotal() int64 {
return 0
func (tc *TestChannel) SentIndex() datamodel.Node {
return basicnode.NewInt(0)
}

// Voucher returns the voucher for this data transfer
func (tc *TestChannel) Voucher() (datatransfer.TypedVoucher, error) {
return tc.vouchers[0], nil
func (tc *TestChannel) Voucher() datatransfer.TypedVoucher {
return tc.vouchers[0]
}

// Sender returns the peer id for the node that is sending data
Expand Down Expand Up @@ -209,6 +200,11 @@ func (tc *TestChannel) OtherParty(thisParty peer.ID) peer.ID {
return tc.sender
}

func (tc *TestChannel) BothPaused() bool { return false }
func (tc *TestChannel) ResponderPaused() bool { return false }
func (tc *TestChannel) InitiatorPaused() bool { return false }
func (tc *TestChannel) SelfPaused() bool { return false }

// Status is the current status of this channel
func (tc *TestChannel) Status() datatransfer.Status {
return tc.status
Expand All @@ -235,23 +231,23 @@ func (tc *TestChannel) Message() string {
}

// Vouchers returns all vouchers sent on this channel
func (tc *TestChannel) Vouchers() ([]datatransfer.TypedVoucher, error) {
return tc.vouchers, nil
func (tc *TestChannel) Vouchers() []datatransfer.TypedVoucher {
return tc.vouchers
}

// VoucherResults are results of vouchers sent on the channel
func (tc *TestChannel) VoucherResults() ([]datatransfer.TypedVoucher, error) {
return tc.voucherResults, nil
func (tc *TestChannel) VoucherResults() []datatransfer.TypedVoucher {
return tc.voucherResults
}

// LastVoucher returns the last voucher sent on the channel
func (tc *TestChannel) LastVoucher() (datatransfer.TypedVoucher, error) {
return tc.vouchers[len(tc.vouchers)-1], nil
func (tc *TestChannel) LastVoucher() datatransfer.TypedVoucher {
return tc.vouchers[len(tc.vouchers)-1]
}

// LastVoucherResult returns the last voucher result sent on the channel
func (tc *TestChannel) LastVoucherResult() (datatransfer.TypedVoucher, error) {
return tc.voucherResults[len(tc.voucherResults)-1], nil
func (tc *TestChannel) LastVoucherResult() datatransfer.TypedVoucher {
return tc.voucherResults[len(tc.voucherResults)-1]
}

func (tc *TestChannel) Stages() *datatransfer.ChannelStages {
Expand Down
18 changes: 4 additions & 14 deletions storagemarket/impl/dtutils/dtutils.go
Expand Up @@ -32,12 +32,7 @@ type EventReceiver interface {
// event or moving to error if a data transfer error occurs
func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
node, err := channelState.Voucher()
if err != nil {
log.Errorf("ignoring data-transfer event as the voucher is invalid, event=%s, channelID=%s: %s", datatransfer.Events[event.Code], "channelID",
channelState.ChannelID().String(), err.Error())
return
}
node := channelState.Voucher()
if node.Voucher == nil {
log.Debugw("ignoring data-transfer event as it's not storage related", "event", datatransfer.Events[event.Code], "channelID",
channelState.ChannelID())
Expand Down Expand Up @@ -95,12 +90,7 @@ func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber
func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
// TODO: are these log messages valid for Client?
node, err := channelState.Voucher()
if err != nil {
log.Errorf("ignoring data-transfer event as the voucher is invalid, event=%s, channelID=%s: %s", datatransfer.Events[event.Code], "channelID",
channelState.ChannelID().String(), err.Error())
return
}
node := channelState.Voucher()
if node.Voucher == nil {
log.Debugw("ignoring data-transfer event as it's not storage related", "event", datatransfer.Events[event.Code], "channelID",
channelState.ChannelID())
Expand Down Expand Up @@ -135,9 +125,9 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferRestarted, channelState.ChannelID())
case datatransfer.Disconnected:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferStalled)
case datatransfer.TransferRequestQueued:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferQueued, channelState.ChannelID())
case datatransfer.Accept:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferQueued, channelState.ChannelID())
case datatransfer.TransferInitiated:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferInitiated, channelState.ChannelID())
case datatransfer.Error:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferFailed, fmt.Errorf("deal data transfer failed: %s", event.Message))
Expand Down