Skip to content

Commit

Permalink
Index publishing work (#673)
Browse files Browse the repository at this point in the history
* feat: retrieve by any CID (not just root CID)

* feat: fix tests

* fix: TestHandleQueryStream

* fix: integration tests

* Publish indexing records to the Network Indexer using the reference provider (#647)

* reference prov integration

* Apply suggestions from code review

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* move provider callback to start

* fix: lint

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* fix conflict

* update dagstore and disable flaky

* refactor: use index-provider instead of indexer-reference-provider

* update to latest data-transfer and index-provider

* announce all deals to the indexer

* update go mods

* update deps

* lint: fix imports

* update dagstore to tagged ver

* update provider

* update deps

* update deps

* log advertisement cid for announcement and update deps

* update deps

* more logging for announcement

* better logic to announce all deals to network indexer

* Connect index provider host to full node (#675)

* add fullnodeApi to Provider

* add idxProvHost

* connect to full node before making announcement

* fix compilation

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

* fix imports

* log connection

* use NetAddrListener iface (#676)

* use NetAddrListener iface

* fix compilation

* fix imports

Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>

* remove libp2p host connect in favour of mesh creator interface (#678)

* remove libp2p host connect in favour of mesh creator interface

* update test harness

* Upgrade to `index-provider` `v.0.3.0`

Upgrade to the latest `index-provider` which includes two main changes:

1. update to go-legs message format
2. cache eviction bug fix

* log failure to connect to full node without returning it

* update dagstore

* Use the latest tagged release for dagstore dependency

Depend on a concrete tagged release of dagstore that contains fixes to
dagstore repo bloat.

* Upgrade to latest indexing dependencies

* Run `go mod tidy`

Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
Co-authored-by: Masih H. Derkani <m@derkani.org>
  • Loading branch information
4 people committed Mar 1, 2022
1 parent f746f85 commit 871e17d
Show file tree
Hide file tree
Showing 22 changed files with 890 additions and 219 deletions.
34 changes: 22 additions & 12 deletions go.mod
Expand Up @@ -3,22 +3,28 @@ module github.com/filecoin-project/go-fil-markets
go 1.13

require (
github.com/filecoin-project/dagstore v0.4.3-0.20211211192320-72b849e131d2
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/filecoin-project/dagstore v0.5.2
github.com/filecoin-project/go-address v0.0.6
github.com/filecoin-project/go-bitfield v0.2.4 // indirect
github.com/filecoin-project/go-cbor-util v0.0.1
github.com/filecoin-project/go-commp-utils v0.1.3
github.com/filecoin-project/go-crypto v0.0.1 // indirect
github.com/filecoin-project/go-data-transfer v1.14.0
github.com/filecoin-project/go-ds-versioning v0.1.1
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1
github.com/filecoin-project/go-state-types v0.1.1-0.20210506134452-99b279731c48
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-state-types v0.1.1
github.com/filecoin-project/go-statemachine v1.0.1
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/index-provider v0.3.1
github.com/filecoin-project/specs-actors v0.9.13
github.com/filecoin-project/specs-actors/v2 v2.3.5-0.20210114162132-5b58b773f4fb
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/storetheindex v0.3.2
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.2.1
github.com/ipfs/go-cid v0.1.0
Expand All @@ -31,8 +37,8 @@ require (
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-ipfs-exchange-offline v0.1.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipld-cbor v0.0.5
github.com/ipfs/go-ipfs-files v0.0.9
github.com/ipfs/go-ipld-cbor v0.0.6-0.20211211231443-5d9b9e1f6fa8
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.5.0
github.com/ipfs/go-merkledag v0.5.1
Expand All @@ -46,16 +52,20 @@ require (
github.com/libp2p/go-libp2p-core v0.14.0
github.com/multiformats/go-multiaddr v0.5.0
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multicodec v0.3.1-0.20211210143421-a526f306ed2c
github.com/multiformats/go-multicodec v0.4.0
github.com/multiformats/go-multihash v0.1.0
github.com/multiformats/go-varint v0.0.6
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9
github.com/stretchr/testify v1.7.0
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11
github.com/whyrusleeping/cbor-gen v0.0.0-20210713220151-be142a5ae1a8
github.com/whyrusleeping/cbor-gen v0.0.0-20211110122933-f57984553008
github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 // indirect
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b // indirect
golang.org/x/exp v0.0.0-20210715201039-d37aa40e8013
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
golang.org/x/sys v0.0.0-20211209171907-798191bca915 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
lukechampine.com/blake3 v1.1.7 // indirect
)

replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
370 changes: 338 additions & 32 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions retrievalmarket/impl/client_test.go
Expand Up @@ -225,6 +225,7 @@ func TestClient_FindProviders(t *testing.T) {
// retrieval deal for the same payload CID with the same peer as an existing
// active deal
func TestClient_DuplicateRetrieve(t *testing.T) {
t.Skip("flaky test")
bgCtx := context.Background()
ctx, cancel := context.WithCancel(bgCtx)
defer cancel()
Expand Down
27 changes: 17 additions & 10 deletions retrievalmarket/impl/integration_test.go
Expand Up @@ -22,6 +22,7 @@ import (
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
Expand All @@ -45,7 +46,7 @@ func TestClientCanMakeQueryToProvider(t *testing.T) {
bgCtx := context.Background()
payChAddr := address.TestAddress

client, expectedCIDs, missingPiece, expectedQR, retrievalPeer, _ := requireSetupTestClientAndProvider(bgCtx, t, payChAddr)
client, expectedCIDs, missingPiece, expectedQR, retrievalPeer, _, pieceStore := requireSetupTestClientAndProvider(bgCtx, t, payChAddr)

t.Run("when piece is found, returns piece and price data", func(t *testing.T) {
expectedQR.Status = retrievalmarket.QueryResponseAvailable
Expand All @@ -69,10 +70,12 @@ func TestClientCanMakeQueryToProvider(t *testing.T) {
})

t.Run("when there is some other error, returns error", func(t *testing.T) {
unknownPiece := tut.GenerateCids(1)[0]
pieceStore.ReturnErrorFromGetPieceInfo(xerrors.Errorf("someerr"))
expectedQR.Status = retrievalmarket.QueryResponseError
expectedQR.Message = "failed to fetch piece to retrieve from: get cid info: GetCIDInfo failed"
actualQR, err := client.Query(bgCtx, retrievalPeer, unknownPiece, retrievalmarket.QueryParams{})
expectedQR.PieceCIDFound = retrievalmarket.QueryItemUnavailable
expectedQR.Size = 0
expectedQR.Message = "failed to fetch piece to retrieve from: could not locate piece: someerr"
actualQR, err := client.Query(bgCtx, retrievalPeer, expectedCIDs[0], retrievalmarket.QueryParams{})
assert.NoError(t, err)
actualQR.MaxPaymentInterval = expectedQR.MaxPaymentInterval
actualQR.MinPricePerByte = expectedQR.MinPricePerByte
Expand All @@ -89,19 +92,22 @@ func TestProvider_Stop(t *testing.T) {
}
bgCtx := context.Background()
payChAddr := address.TestAddress
client, expectedCIDs, _, _, retrievalPeer, provider := requireSetupTestClientAndProvider(bgCtx, t, payChAddr)
client, expectedCIDs, _, _, retrievalPeer, provider, _ := requireSetupTestClientAndProvider(bgCtx, t, payChAddr)
require.NoError(t, provider.Stop())
_, err := client.Query(bgCtx, retrievalPeer, expectedCIDs[0], retrievalmarket.QueryParams{})

assert.EqualError(t, err, "exhausted 5 attempts but failed to open stream, err: protocol not supported")
}

func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChAddr address.Address) (retrievalmarket.RetrievalClient,
func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChAddr address.Address) (
retrievalmarket.RetrievalClient,
[]cid.Cid,
cid.Cid,
retrievalmarket.QueryResponse,
retrievalmarket.RetrievalPeer,
retrievalmarket.RetrievalProvider) {
retrievalmarket.RetrievalProvider,
*tut.TestPieceStore,
) {
testData := tut.NewLibp2pTestData(ctx, t)
nw1 := rmnet.NewFromLibp2pHost(testData.Host1, rmnet.RetryParameters(100*time.Millisecond, 1*time.Second, 5, 5))
cids := tut.GenerateCids(2)
Expand Down Expand Up @@ -130,6 +136,7 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
expectedPieceCIDs := tut.GenerateCids(3)
missingCID := tut.GenerateCids(1)[0]
expectedQR := tut.MakeTestQueryResponse()
dagstoreWrapper := tut.NewMockDagStoreWrapper(pieceStore, sectorAccessor)

pieceStore.ExpectMissingCID(missingCID)
for i, c := range expectedCIDs {
Expand All @@ -140,6 +147,7 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
},
},
})
dagstoreWrapper.AddBlockToPieceIndex(c, expectedPieceCIDs[i])
}
for i, piece := range expectedPieceCIDs {
pieceStore.ExpectPiece(piece, piecestore.PieceInfo{
Expand Down Expand Up @@ -170,8 +178,6 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
return ask, nil
}

// Set up a DAG store
dagstoreWrapper := tut.NewMockDagStoreWrapper(pieceStore, sectorAccessor)
provider, err := retrievalimpl.NewProvider(
paymentAddress, providerNode, sectorAccessor, nw2, pieceStore, dagstoreWrapper, dt2, providerDs,
priceFunc)
Expand All @@ -186,7 +192,7 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA

expectedQR.Size = uint64(abi.PaddedPieceSize(expectedQR.Size).Unpadded())

return client, expectedCIDs, missingCID, expectedQR, retrievalPeer, provider
return client, expectedCIDs, missingCID, expectedQR, retrievalPeer, provider, pieceStore
}

func TestClientCanMakeDealWithProvider(t *testing.T) {
Expand Down Expand Up @@ -711,6 +717,7 @@ func setupProvider(

// Create a DAG store wrapper
dagstoreWrapper := tut.NewMockDagStoreWrapper(pieceStore, sectorAccessor)
dagstoreWrapper.AddBlockToPieceIndex(payloadCID, pieceInfo.PieceCID)

// Register the piece with the DAG store wrapper
err = stores.RegisterShardSync(ctx, dagstoreWrapper, pieceInfo.PieceCID, carFilePath, true)
Expand Down
94 changes: 83 additions & 11 deletions retrievalmarket/impl/provider.go
Expand Up @@ -354,7 +354,7 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) {
answer.Size = uint64(pieceInfo.Deals[0].Length.Unpadded()) // TODO: verify on intermediate
answer.PieceCIDFound = retrievalmarket.QueryItemAvailable

storageDeals, err := storageDealsForPiece(query.PieceCID != nil, query.PayloadCID, pieceInfo, p.pieceStore)
storageDeals, err := p.storageDealsForPiece(query.PieceCID != nil, query.PayloadCID, pieceInfo)
if err != nil {
log.Errorf("Retrieval query: storageDealsForPiece: %s", err)
answer.Status = retrievalmarket.QueryResponseError
Expand Down Expand Up @@ -388,48 +388,61 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) {
sendResp(answer)
}

func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, pieceCID cid.Cid) (piecestore.PieceInfo, bool, error) {
cidInfo, err := p.pieceStore.GetCIDInfo(payloadCID)
// Given the CID of a block, find a piece that contains that block.
// If the client has specified which piece they want, return that piece.
// Otherwise prefer pieces that are already unsealed.
func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, clientPieceCID cid.Cid) (piecestore.PieceInfo, bool, error) {
// Get all pieces that contain the target block
piecesWithTargetBlock, err := p.dagStore.GetPiecesContainingBlock(payloadCID)
if err != nil {
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("get cid info: %w", err)
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("getting pieces for cid %s: %w", payloadCID, err)
}

// For each piece that contains the target block
var lastErr error
var sealedPieceInfo *piecestore.PieceInfo

for _, pieceBlockLocation := range cidInfo.PieceBlockLocations {
pieceInfo, err := p.pieceStore.GetPieceInfo(pieceBlockLocation.PieceCID)
for _, pieceWithTargetBlock := range piecesWithTargetBlock {
// Get the deals for the piece
pieceInfo, err := p.pieceStore.GetPieceInfo(pieceWithTargetBlock)
if err != nil {
lastErr = err
continue
}

// if client wants to retrieve the payload from a specific piece, just return that piece.
if pieceCID.Defined() && pieceInfo.PieceCID.Equals(pieceCID) {
if clientPieceCID.Defined() && pieceInfo.PieceCID.Equals(clientPieceCID) {
return pieceInfo, p.pieceInUnsealedSector(ctx, pieceInfo), nil
}

// if client dosen't have a preference for a particular piece, prefer a piece
// if client doesn't have a preference for a particular piece, prefer a piece
// for which an unsealed sector exists.
if pieceCID.Equals(cid.Undef) {
if clientPieceCID.Equals(cid.Undef) {
if p.pieceInUnsealedSector(ctx, pieceInfo) {
// The piece is in an unsealed sector, so just return it
return pieceInfo, true, nil
}

if sealedPieceInfo == nil {
// The piece is not in an unsealed sector, so save it but keep
// checking other pieces to see if there is one that is in an
// unsealed sector
sealedPieceInfo = &pieceInfo
}
}

}

// Found a piece containing the target block, piece is in a sealed sector
if sealedPieceInfo != nil {
return *sealedPieceInfo, false, nil
}

// Couldn't find a piece containing the target block
if lastErr == nil {
lastErr = xerrors.Errorf("unknown pieceCID %s", pieceCID.String())
lastErr = xerrors.Errorf("unknown pieceCID %s", clientPieceCID.String())
}

// Error finding a piece containing the target block
return piecestore.PieceInfoUndefined, false, xerrors.Errorf("could not locate piece: %w", lastErr)
}

Expand All @@ -448,6 +461,65 @@ func (p *Provider) pieceInUnsealedSector(ctx context.Context, pieceInfo piecesto
return false
}

func (p *Provider) storageDealsForPiece(clientSpecificPiece bool, payloadCID cid.Cid, pieceInfo piecestore.PieceInfo) ([]abi.DealID, error) {
var storageDeals []abi.DealID
var err error
if clientSpecificPiece {
// If the user wants to retrieve the payload from a specific piece,
// we only need to inspect storage deals made for that piece to quote a price.
for _, d := range pieceInfo.Deals {
storageDeals = append(storageDeals, d.DealID)
}
} else {
// If the user does NOT want to retrieve from a specific piece, we'll have to inspect all storage deals
// made for that piece to quote a price.
storageDeals, err = p.getAllDealsContainingPayload(payloadCID)
if err != nil {
return nil, xerrors.Errorf("failed to fetch deals for payload: %w", err)
}
}

if len(storageDeals) == 0 {
return nil, xerrors.New("no storage deals found")
}

return storageDeals, nil
}

func (p *Provider) getAllDealsContainingPayload(payloadCID cid.Cid) ([]abi.DealID, error) {
// Get all pieces that contain the target block
piecesWithTargetBlock, err := p.dagStore.GetPiecesContainingBlock(payloadCID)
if err != nil {
return nil, xerrors.Errorf("getting pieces for cid %s: %w", payloadCID, err)
}

// For each piece that contains the target block
var lastErr error
var dealsIds []abi.DealID
for _, pieceWithTargetBlock := range piecesWithTargetBlock {
// Get the deals for the piece
pieceInfo, err := p.pieceStore.GetPieceInfo(pieceWithTargetBlock)
if err != nil {
lastErr = err
continue
}

for _, d := range pieceInfo.Deals {
dealsIds = append(dealsIds, d.DealID)
}
}

if lastErr == nil && len(dealsIds) == 0 {
return nil, xerrors.New("no deals found")
}

if lastErr != nil && len(dealsIds) == 0 {
return nil, xerrors.Errorf("failed to fetch deals containing payload %s: %w", payloadCID, lastErr)
}

return dealsIds, nil
}

// GetDynamicAsk quotes a dynamic price for the retrieval deal by calling the user configured
// dynamic pricing function. It passes the static price parameters set in the Ask Store to the pricing function.
func (p *Provider) GetDynamicAsk(ctx context.Context, input retrievalmarket.PricingInput, storageDeals []abi.DealID) (retrievalmarket.Ask, error) {
Expand Down
57 changes: 1 addition & 56 deletions retrievalmarket/impl/provider_environments.go
Expand Up @@ -31,7 +31,7 @@ type providerValidationEnvironment struct {
func (pve *providerValidationEnvironment) GetAsk(ctx context.Context, payloadCid cid.Cid, pieceCid *cid.Cid,
piece piecestore.PieceInfo, isUnsealed bool, client peer.ID) (retrievalmarket.Ask, error) {

storageDeals, err := storageDealsForPiece(pieceCid != nil, payloadCid, piece, pve.p.pieceStore)
storageDeals, err := pve.p.storageDealsForPiece(pieceCid != nil, payloadCid, piece)
if err != nil {
return retrievalmarket.Ask{}, xerrors.Errorf("failed to fetch deals for payload: %w", err)
}
Expand Down Expand Up @@ -178,61 +178,6 @@ func (pde *providerDealEnvironment) DeleteStore(dealID retrievalmarket.DealID) e
return nil
}

func storageDealsForPiece(clientSpecificPiece bool, payloadCID cid.Cid, pieceInfo piecestore.PieceInfo, pieceStore piecestore.PieceStore) ([]abi.DealID, error) {
var storageDeals []abi.DealID
var err error
if clientSpecificPiece {
// If the user wants to retrieve the payload from a specific piece,
// we only need to inspect storage deals made for that piece to quote a price.
for _, d := range pieceInfo.Deals {
storageDeals = append(storageDeals, d.DealID)
}
} else {
// If the user does NOT want to retrieve from a specific piece, we'll have to inspect all storage deals
// made for that piece to quote a price.
storageDeals, err = getAllDealsContainingPayload(pieceStore, payloadCID)
if err != nil {
return nil, xerrors.Errorf("failed to fetch deals for payload: %w", err)
}
}

if len(storageDeals) == 0 {
return nil, xerrors.New("no storage deals found")
}

return storageDeals, nil
}

func getAllDealsContainingPayload(pieceStore piecestore.PieceStore, payloadCID cid.Cid) ([]abi.DealID, error) {
cidInfo, err := pieceStore.GetCIDInfo(payloadCID)
if err != nil {
return nil, xerrors.Errorf("get cid info: %w", err)
}
var dealsIds []abi.DealID
var lastErr error

for _, pieceBlockLocation := range cidInfo.PieceBlockLocations {
pieceInfo, err := pieceStore.GetPieceInfo(pieceBlockLocation.PieceCID)
if err != nil {
lastErr = err
continue
}
for _, d := range pieceInfo.Deals {
dealsIds = append(dealsIds, d.DealID)
}
}

if lastErr == nil && len(dealsIds) == 0 {
return nil, xerrors.New("no deals found")
}

if lastErr != nil && len(dealsIds) == 0 {
return nil, xerrors.Errorf("failed to fetch deals containing payload %s: %w", payloadCID, lastErr)
}

return dealsIds, nil
}

var _ dtutils.StoreGetter = &providerStoreGetter{}

type providerStoreGetter struct {
Expand Down

0 comments on commit 871e17d

Please sign in to comment.