Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Bitswap retrievals in SPs #674

Draft
wants to merge 3 commits into
base: feat/retrieve-any-cid
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion go.mod
Expand Up @@ -3,7 +3,7 @@ module github.com/filecoin-project/go-fil-markets
go 1.13

require (
github.com/filecoin-project/dagstore v0.5.2-0.20220120115845-e07b050f48d3
github.com/filecoin-project/dagstore v0.5.2-0.20220121144931-72f676fe8f38
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-commp-utils v0.1.3
Expand All @@ -22,6 +22,7 @@ require (
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs/go-bitswap v0.5.1
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.2.1
github.com/ipfs/go-cid v0.1.0
Expand All @@ -33,8 +34,10 @@ require (
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-ipfs-exchange-interface v0.1.0
github.com/ipfs/go-ipfs-exchange-offline v0.1.1
github.com/ipfs/go-ipfs-files v0.0.9
github.com/ipfs/go-ipfs-routing v0.2.1
github.com/ipfs/go-ipld-cbor v0.0.5
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -200,8 +200,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5/go.mod h1:JpoxHjuQauoxiFMl1ie8Xc/7TfLuMZ5eOCONd1sUBHg=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/filecoin-project/dagstore v0.5.2-0.20220120115845-e07b050f48d3 h1:kuZsRFDjyrDAgoHNolJBnO7xX1EETHdJ090sgD3Al5E=
github.com/filecoin-project/dagstore v0.5.2-0.20220120115845-e07b050f48d3/go.mod h1:OdlK3x5m3Mol874WC2bI79H4H2+leN+FabwWdW2D/wY=
github.com/filecoin-project/dagstore v0.5.2-0.20220121144931-72f676fe8f38 h1:p8mDOiiMDwDpCYXNJ/+y6dlzMMiu4Y8BEaHBnehOyig=
github.com/filecoin-project/dagstore v0.5.2-0.20220121144931-72f676fe8f38/go.mod h1:7aV6HIrDeX1ypja7BeSOF9lwGX9CCbYuBjLXXXp+5sY=
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM=
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
Expand Down
4 changes: 2 additions & 2 deletions retrievalmarket/impl/integration_test.go
Expand Up @@ -178,7 +178,7 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
return ask, nil
}

provider, err := retrievalimpl.NewProvider(
provider, err := retrievalimpl.NewProvider(testData.Host2,
paymentAddress, providerNode, sectorAccessor, nw2, pieceStore, dagstoreWrapper, dt2, providerDs,
priceFunc)
require.NoError(t, err)
Expand Down Expand Up @@ -727,7 +727,7 @@ func setupProvider(
// (instead of using the cached CAR file)
_ = os.Remove(carFilePath)

provider, err := retrievalimpl.NewProvider(providerPaymentAddr, providerNode, sectorAccessor,
provider, err := retrievalimpl.NewProvider(testData.Host2, providerPaymentAddr, providerNode, sectorAccessor,
nw2, pieceStore, dagstoreWrapper, dt2, providerDs, priceFunc, opts...)
require.NoError(t, err)

Expand Down
83 changes: 81 additions & 2 deletions retrievalmarket/impl/provider.go
Expand Up @@ -7,11 +7,18 @@ import (
"time"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-bitswap"
bsnetwork "github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
"github.com/libp2p/go-libp2p-core/host"
"golang.org/x/xerrors"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
versioning "github.com/filecoin-project/go-ds-versioning/pkg"
Expand Down Expand Up @@ -44,6 +51,7 @@ var queryTimeout = 5 * time.Second

// Provider is the production implementation of the RetrievalProvider interface
type Provider struct {
h host.Host
dataTransfer datatransfer.Manager
node retrievalmarket.RetrievalProviderNode
sa retrievalmarket.SectorAccessor
Expand All @@ -62,6 +70,7 @@ type Provider struct {
retrievalPricingFunc RetrievalPricingFunc
dagStore stores.DAGStoreWrapper
stores *stores.ReadOnlyBlockstores
bsServer exchange.Interface
}

type internalProviderEvent struct {
Expand Down Expand Up @@ -100,7 +109,9 @@ func DisableNewDeals() RetrievalProviderOption {
}

// NewProvider returns a new retrieval Provider
func NewProvider(minerAddress address.Address,
func NewProvider(
h host.Host,
minerAddress address.Address,
node retrievalmarket.RetrievalProviderNode,
sa retrievalmarket.SectorAccessor,
network rmnet.RetrievalMarketNetwork,
Expand All @@ -117,6 +128,7 @@ func NewProvider(minerAddress address.Address,
}

p := &Provider{
h: h,
dataTransfer: dataTransfer,
node: node,
sa: sa,
Expand Down Expand Up @@ -214,12 +226,34 @@ func NewProvider(minerAddress address.Address,

// Stop stops handling incoming requests.
func (p *Provider) Stop() error {
p.bsServer.Close()
return p.network.StopHandlingRequests()
}

// Start begins listening for deals on the given host.
// Start must be called in order to accept incoming deals.
func (p *Provider) Start(ctx context.Context) error {
abs, err := p.dagStore.AllShardsReadBlockstore(dagstore.ShardSelectorF(func(c cid.Cid, shards []shard.Key) (shard.Key, error) {
for _, sk := range shards {
pieceCid, err := cid.Parse(sk.String())
if err != nil {
return shard.Key{}, fmt.Errorf("failed to parse cid")
}
b, err := p.isFreeAndUnsealed(ctx, c, pieceCid)
if err != nil {
return shard.Key{}, fmt.Errorf("failed to verify is piece is free and unsealed")
}
if b {
return sk, nil
}
}

return shard.Key{}, errors.New("no shard is free and unsealed")
}))
if err != nil {
return fmt.Errorf("failed to create blockstore for bitswap retrievals: %w", err)
}

go func() {
err := p.migrateStateMachines(ctx)
if err != nil {
Expand All @@ -230,6 +264,18 @@ func (p *Provider) Start(ctx context.Context) error {
log.Warnf("Publish retrieval provider ready event: %s", err.Error())
}
}()

// start a bitswap session on the provider
nilRouter, err := nilrouting.ConstructNilRouting(nil, nil, nil, nil)
if err != nil {
return err
}
bsopts := []bitswap.Option{bitswap.MaxOutstandingBytesPerPeer(1 << 20)}
bsServer := bitswap.New(ctx, bsnetwork.NewFromIpfsHost(p.h, nilRouter), abs, bsopts...)
p.bsServer = bsServer

log.Infow("bitswap server running on SP", "multiaddrs", p.h.Addrs(), "peerId", p.h.ID())

return p.network.SetDelegate(p)
}

Expand Down Expand Up @@ -388,9 +434,42 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) {
sendResp(answer)
}

// Can the piece `pieceCid` be served from an unsealed sector and will the corresponding retrieval be free ?
func (p *Provider) isFreeAndUnsealed(ctx context.Context, c cid.Cid, pieceCid cid.Cid) (bool, error) {
pieceInfo, err := p.pieceStore.GetPieceInfo(pieceCid)
if err != nil {
return false, fmt.Errorf("failed to get piece info: %w", err)
}

if !p.pieceInUnsealedSector(ctx, pieceInfo) {
return false, nil
}

// The piece is in an unsealed sector
// Is it marked for free retrieval ?
input := retrievalmarket.PricingInput{
// piece from which the payload will be retrieved
PieceCID: pieceInfo.PieceCID,
PayloadCID: c,
Unsealed: true,
}

var dealsIds []abi.DealID
for _, d := range pieceInfo.Deals {
dealsIds = append(dealsIds, d.DealID)
}

ask, err := p.GetDynamicAsk(ctx, input, dealsIds)
if err != nil {
return false, fmt.Errorf("failed to get retrieval ask: %w", err)
}

return ask.PricePerByte.NilOrZero(), nil
}

// Given the CID of a block, find a piece that contains that block.
// If the client has specified which piece they want, return that piece.
// Otherwise prefer pieces that are already unsealed.
// Otherwise prefer pieces that are already unsealed & free.
func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, clientPieceCID cid.Cid) (piecestore.PieceInfo, bool, error) {
// Get all pieces that contain the target block
piecesWithTargetBlock, err := p.dagStore.GetPiecesContainingBlock(payloadCID)
Expand Down
21 changes: 16 additions & 5 deletions retrievalmarket/impl/provider_test.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ipld/go-ipld-prime/codec/dagcbor"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/libp2p/go-libp2p-core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cbg "github.com/whyrusleeping/cbor-gen"
Expand Down Expand Up @@ -137,9 +138,12 @@ func TestDynamicPricing(t *testing.T) {
net *tut.TestRetrievalMarketNetwork,
pFnc retrievalimpl.RetrievalPricingFunc,
) retrievalmarket.RetrievalProvider {
h, err := mocknet.New().GenPeer()
require.NoError(t, err)

ds := dss.MutexWrap(datastore.NewMapDatastore())
dt := tut.NewTestDataTransfer()
c, err := retrievalimpl.NewProvider(expectedAddress, node, sa, net, pieceStore, dagStore, dt, ds, pFnc)
c, err := retrievalimpl.NewProvider(h, expectedAddress, node, sa, net, pieceStore, dagStore, dt, ds, pFnc)
require.NoError(t, err)
tut.StartAndWaitForReady(ctx, t, c)
return c
Expand Down Expand Up @@ -718,7 +722,10 @@ func TestHandleQueryStream(t *testing.T) {
return ask, nil
}

c, err := retrievalimpl.NewProvider(expectedAddress, node, sa, net, pieceStore, dagStore, dt, ds, priceFunc)
h, err := mocknet.New().GenPeer()
require.NoError(t, err)

c, err := retrievalimpl.NewProvider(h, expectedAddress, node, sa, net, pieceStore, dagStore, dt, ds, priceFunc)
require.NoError(t, err)

tut.StartAndWaitForReady(ctx, t, c)
Expand Down Expand Up @@ -945,7 +952,7 @@ func TestProvider_Construct(t *testing.T) {
return ask, nil
}

_, err := retrievalimpl.NewProvider(
_, err := retrievalimpl.NewProvider(nil,
spect.NewIDAddr(t, 2344),
node,
sa,
Expand Down Expand Up @@ -1002,6 +1009,7 @@ func TestProviderConfigOpts(t *testing.T) {
}

p, err := retrievalimpl.NewProvider(
nil,
spect.NewIDAddr(t, 2344),
node,
sa,
Expand All @@ -1022,7 +1030,7 @@ func TestProviderConfigOpts(t *testing.T) {
return true, "yes", nil
})

p, err = retrievalimpl.NewProvider(
p, err = retrievalimpl.NewProvider(nil,
spect.NewIDAddr(t, 2344),
testnodes.NewTestRetrievalProviderNode(),
testnodes.NewTestSectorAccessor(),
Expand Down Expand Up @@ -1183,7 +1191,10 @@ func TestProviderMigrations(t *testing.T) {
return ask, nil
}

retrievalProvider, err := retrievalimpl.NewProvider(
h, err := mocknet.New().GenPeer()
require.NoError(t, err)

retrievalProvider, err := retrievalimpl.NewProvider(h,
spect.NewIDAddr(t, 2344),
node,
sa,
Expand Down
2 changes: 1 addition & 1 deletion retrievalmarket/storage_retrieval_integration_test.go
Expand Up @@ -478,7 +478,7 @@ func newRetrievalHarnessWithDeps(
return ask, nil
}

provider, err := retrievalimpl.NewProvider(
provider, err := retrievalimpl.NewProvider(sh.TestData.Host2,
providerPaymentAddr, providerNode, sa, nw2, pieceStore,
sh.DagStore, sh.DTProvider, providerDs, priceFunc)
require.NoError(t, err)
Expand Down
5 changes: 5 additions & 0 deletions shared_testutil/mockdagstorewrapper.go
Expand Up @@ -7,6 +7,8 @@ import (
"sync"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
carindex "github.com/ipld/go-car/v2/index"
Expand Down Expand Up @@ -91,6 +93,9 @@ func (m *MockDagStoreWrapper) ClearRegistrations() {

m.registrations = make(map[cid.Cid]registration)
}
func (m *MockDagStoreWrapper) AllShardsReadBlockstore(f dagstore.ShardSelectorF) (bstore.Blockstore, error) {
return bstore.NewBlockstore(datastore.NewMapDatastore()), nil
}

func (m *MockDagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.ClosableBlockstore, error) {
m.lk.Lock()
Expand Down
2 changes: 2 additions & 0 deletions stores/dagstore.go
Expand Up @@ -40,6 +40,8 @@ type DAGStoreWrapper interface {

GetIterableIndexForPiece(pieceCid cid.Cid) (carindex.IterableIndex, error)

AllShardsReadBlockstore(f dagstore.ShardSelectorF) (bstore.Blockstore, error)

// Close closes the dag store wrapper.
Close() error
}
Expand Down