Skip to content

Commit

Permalink
run bitsap retrievals
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Jan 21, 2022
1 parent eb2fe1f commit 6d24ebc
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 13 deletions.
5 changes: 4 additions & 1 deletion go.mod
Expand Up @@ -3,7 +3,7 @@ module github.com/filecoin-project/go-fil-markets
go 1.13

require (
github.com/filecoin-project/dagstore v0.5.2-0.20220120115845-e07b050f48d3
github.com/filecoin-project/dagstore v0.5.2-0.20220121144931-72f676fe8f38
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-commp-utils v0.1.3
Expand All @@ -22,6 +22,7 @@ require (
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs/go-bitswap v0.5.1
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.2.1
github.com/ipfs/go-cid v0.1.0
Expand All @@ -33,8 +34,10 @@ require (
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-ipfs-exchange-interface v0.1.0
github.com/ipfs/go-ipfs-exchange-offline v0.1.1
github.com/ipfs/go-ipfs-files v0.0.9
github.com/ipfs/go-ipfs-routing v0.2.1
github.com/ipfs/go-ipld-cbor v0.0.5
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -200,8 +200,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/facebookgo/atomicfile v0.0.0-20151019160806-2de1f203e7d5/go.mod h1:JpoxHjuQauoxiFMl1ie8Xc/7TfLuMZ5eOCONd1sUBHg=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/filecoin-project/dagstore v0.5.2-0.20220120115845-e07b050f48d3 h1:kuZsRFDjyrDAgoHNolJBnO7xX1EETHdJ090sgD3Al5E=
github.com/filecoin-project/dagstore v0.5.2-0.20220120115845-e07b050f48d3/go.mod h1:OdlK3x5m3Mol874WC2bI79H4H2+leN+FabwWdW2D/wY=
github.com/filecoin-project/dagstore v0.5.2-0.20220121144931-72f676fe8f38 h1:p8mDOiiMDwDpCYXNJ/+y6dlzMMiu4Y8BEaHBnehOyig=
github.com/filecoin-project/dagstore v0.5.2-0.20220121144931-72f676fe8f38/go.mod h1:7aV6HIrDeX1ypja7BeSOF9lwGX9CCbYuBjLXXXp+5sY=
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM=
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
Expand Down
4 changes: 2 additions & 2 deletions retrievalmarket/impl/integration_test.go
Expand Up @@ -178,7 +178,7 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
return ask, nil
}

provider, err := retrievalimpl.NewProvider(
provider, err := retrievalimpl.NewProvider(testData.Host2,
paymentAddress, providerNode, sectorAccessor, nw2, pieceStore, dagstoreWrapper, dt2, providerDs,
priceFunc)
require.NoError(t, err)
Expand Down Expand Up @@ -727,7 +727,7 @@ func setupProvider(
// (instead of using the cached CAR file)
_ = os.Remove(carFilePath)

provider, err := retrievalimpl.NewProvider(providerPaymentAddr, providerNode, sectorAccessor,
provider, err := retrievalimpl.NewProvider(testData.Host2, providerPaymentAddr, providerNode, sectorAccessor,
nw2, pieceStore, dagstoreWrapper, dt2, providerDs, priceFunc, opts...)
require.NoError(t, err)

Expand Down
83 changes: 81 additions & 2 deletions retrievalmarket/impl/provider.go
Expand Up @@ -6,6 +6,15 @@ import (
"fmt"
"time"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/shard"
exchange "github.com/ipfs/go-ipfs-exchange-interface"

"github.com/ipfs/go-bitswap"
bsnetwork "github.com/ipfs/go-bitswap/network"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
"github.com/libp2p/go-libp2p-core/host"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -44,6 +53,7 @@ var queryTimeout = 5 * time.Second

// Provider is the production implementation of the RetrievalProvider interface
type Provider struct {
h host.Host
dataTransfer datatransfer.Manager
node retrievalmarket.RetrievalProviderNode
sa retrievalmarket.SectorAccessor
Expand All @@ -62,6 +72,7 @@ type Provider struct {
retrievalPricingFunc RetrievalPricingFunc
dagStore stores.DAGStoreWrapper
stores *stores.ReadOnlyBlockstores
bsServer exchange.Interface
}

type internalProviderEvent struct {
Expand Down Expand Up @@ -100,7 +111,9 @@ func DisableNewDeals() RetrievalProviderOption {
}

// NewProvider returns a new retrieval Provider
func NewProvider(minerAddress address.Address,
func NewProvider(
h host.Host,
minerAddress address.Address,
node retrievalmarket.RetrievalProviderNode,
sa retrievalmarket.SectorAccessor,
network rmnet.RetrievalMarketNetwork,
Expand All @@ -117,6 +130,7 @@ func NewProvider(minerAddress address.Address,
}

p := &Provider{
h: h,
dataTransfer: dataTransfer,
node: node,
sa: sa,
Expand Down Expand Up @@ -214,12 +228,34 @@ func NewProvider(minerAddress address.Address,

// Stop stops handling incoming requests.
func (p *Provider) Stop() error {
p.bsServer.Close()
return p.network.StopHandlingRequests()
}

// Start begins listening for deals on the given host.
// Start must be called in order to accept incoming deals.
func (p *Provider) Start(ctx context.Context) error {
abs, err := p.dagStore.AllShardsReadBlockstore(dagstore.ShardSelectorF(func(c cid.Cid, shards []shard.Key) (shard.Key, error) {
for _, sk := range shards {
pieceCid, err := cid.Parse(sk.String())
if err != nil {
return shard.Key{}, fmt.Errorf("failed to parse cid")
}
b, err := p.isFreeAndUnsealed(ctx, c, pieceCid)
if err != nil {
return shard.Key{}, fmt.Errorf("failed to verify is piece is free and unsealed")
}
if b {
return sk, nil
}
}

return shard.Key{}, errors.New("no shard is free and unsealed")
}))
if err != nil {
return fmt.Errorf("failed to create blockstore for bitswap retrievals: %w", err)
}

go func() {
err := p.migrateStateMachines(ctx)
if err != nil {
Expand All @@ -230,6 +266,16 @@ func (p *Provider) Start(ctx context.Context) error {
log.Warnf("Publish retrieval provider ready event: %s", err.Error())
}
}()

// start a bitswap session on the provider
nilRouter, err := nilrouting.ConstructNilRouting(nil, nil, nil, nil)
if err != nil {
return err
}
bsopts := []bitswap.Option{bitswap.MaxOutstandingBytesPerPeer(1 << 20)}
bsServer := bitswap.New(ctx, bsnetwork.NewFromIpfsHost(p.h, nilRouter), abs, bsopts...)
p.bsServer = bsServer

return p.network.SetDelegate(p)
}

Expand Down Expand Up @@ -388,9 +434,42 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) {
sendResp(answer)
}

// Can the piece `pieceCid` be served from an unsealed sector and will the corresponding retrieval be free ?
func (p *Provider) isFreeAndUnsealed(ctx context.Context, c cid.Cid, pieceCid cid.Cid) (bool, error) {
pieceInfo, err := p.pieceStore.GetPieceInfo(pieceCid)
if err != nil {
return false, fmt.Errorf("failed to get piece info: %w", err)
}

if !p.pieceInUnsealedSector(ctx, pieceInfo) {
return false, nil
}

// The piece is in an unsealed sector
// Is it marked for free retrieval ?
input := retrievalmarket.PricingInput{
// piece from which the payload will be retrieved
PieceCID: pieceInfo.PieceCID,
PayloadCID: c,
Unsealed: true,
}

var dealsIds []abi.DealID
for _, d := range pieceInfo.Deals {
dealsIds = append(dealsIds, d.DealID)
}

ask, err := p.GetDynamicAsk(ctx, input, dealsIds)
if err != nil {
return false, fmt.Errorf("failed to get retrieval ask: %w", err)
}

return ask.PricePerByte.NilOrZero(), nil
}

// Given the CID of a block, find a piece that contains that block.
// If the client has specified which piece they want, return that piece.
// Otherwise prefer pieces that are already unsealed.
// Otherwise prefer pieces that are already unsealed & free.
func (p *Provider) getPieceInfoFromCid(ctx context.Context, payloadCID, clientPieceCID cid.Cid) (piecestore.PieceInfo, bool, error) {
// Get all pieces that contain the target block
piecesWithTargetBlock, err := p.dagStore.GetPiecesContainingBlock(payloadCID)
Expand Down
22 changes: 17 additions & 5 deletions retrievalmarket/impl/provider_test.go
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
Expand Down Expand Up @@ -137,9 +139,12 @@ func TestDynamicPricing(t *testing.T) {
net *tut.TestRetrievalMarketNetwork,
pFnc retrievalimpl.RetrievalPricingFunc,
) retrievalmarket.RetrievalProvider {
h, err := mocknet.New().GenPeer()
require.NoError(t, err)

ds := dss.MutexWrap(datastore.NewMapDatastore())
dt := tut.NewTestDataTransfer()
c, err := retrievalimpl.NewProvider(expectedAddress, node, sa, net, pieceStore, dagStore, dt, ds, pFnc)
c, err := retrievalimpl.NewProvider(h, expectedAddress, node, sa, net, pieceStore, dagStore, dt, ds, pFnc)
require.NoError(t, err)
tut.StartAndWaitForReady(ctx, t, c)
return c
Expand Down Expand Up @@ -718,7 +723,10 @@ func TestHandleQueryStream(t *testing.T) {
return ask, nil
}

c, err := retrievalimpl.NewProvider(expectedAddress, node, sa, net, pieceStore, dagStore, dt, ds, priceFunc)
h, err := mocknet.New().GenPeer()
require.NoError(t, err)

c, err := retrievalimpl.NewProvider(h, expectedAddress, node, sa, net, pieceStore, dagStore, dt, ds, priceFunc)
require.NoError(t, err)

tut.StartAndWaitForReady(ctx, t, c)
Expand Down Expand Up @@ -945,7 +953,7 @@ func TestProvider_Construct(t *testing.T) {
return ask, nil
}

_, err := retrievalimpl.NewProvider(
_, err := retrievalimpl.NewProvider(nil,
spect.NewIDAddr(t, 2344),
node,
sa,
Expand Down Expand Up @@ -1002,6 +1010,7 @@ func TestProviderConfigOpts(t *testing.T) {
}

p, err := retrievalimpl.NewProvider(
nil,
spect.NewIDAddr(t, 2344),
node,
sa,
Expand All @@ -1022,7 +1031,7 @@ func TestProviderConfigOpts(t *testing.T) {
return true, "yes", nil
})

p, err = retrievalimpl.NewProvider(
p, err = retrievalimpl.NewProvider(nil,
spect.NewIDAddr(t, 2344),
testnodes.NewTestRetrievalProviderNode(),
testnodes.NewTestSectorAccessor(),
Expand Down Expand Up @@ -1183,7 +1192,10 @@ func TestProviderMigrations(t *testing.T) {
return ask, nil
}

retrievalProvider, err := retrievalimpl.NewProvider(
h, err := mocknet.New().GenPeer()
require.NoError(t, err)

retrievalProvider, err := retrievalimpl.NewProvider(h,
spect.NewIDAddr(t, 2344),
node,
sa,
Expand Down
2 changes: 1 addition & 1 deletion retrievalmarket/storage_retrieval_integration_test.go
Expand Up @@ -478,7 +478,7 @@ func newRetrievalHarnessWithDeps(
return ask, nil
}

provider, err := retrievalimpl.NewProvider(
provider, err := retrievalimpl.NewProvider(sh.TestData.Host2,
providerPaymentAddr, providerNode, sa, nw2, pieceStore,
sh.DagStore, sh.DTProvider, providerDs, priceFunc)
require.NoError(t, err)
Expand Down
6 changes: 6 additions & 0 deletions shared_testutil/mockdagstorewrapper.go
Expand Up @@ -6,6 +6,9 @@ import (
"os"
"sync"

"github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"

"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
Expand Down Expand Up @@ -91,6 +94,9 @@ func (m *MockDagStoreWrapper) ClearRegistrations() {

m.registrations = make(map[cid.Cid]registration)
}
func (m *MockDagStoreWrapper) AllShardsReadBlockstore(f dagstore.ShardSelectorF) (bstore.Blockstore, error) {
return bstore.NewBlockstore(datastore.NewMapDatastore()), nil
}

func (m *MockDagStoreWrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.ClosableBlockstore, error) {
m.lk.Lock()
Expand Down
2 changes: 2 additions & 0 deletions stores/dagstore.go
Expand Up @@ -40,6 +40,8 @@ type DAGStoreWrapper interface {

GetIterableIndexForPiece(pieceCid cid.Cid) (carindex.IterableIndex, error)

AllShardsReadBlockstore(f dagstore.ShardSelectorF) (bstore.Blockstore, error)

// Close closes the dag store wrapper.
Close() error
}
Expand Down

0 comments on commit 6d24ebc

Please sign in to comment.