Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use retrieval ask interface as impl argument #645

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions retrievalmarket/impl/integration_test.go
Expand Up @@ -3,6 +3,7 @@ package retrievalimpl_test
import (
"bytes"
"context"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/askstore"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -160,6 +161,8 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
testutil.StartAndWaitForReady(ctx, t, dt2)
require.NoError(t, err)
providerDs := namespace.Wrap(testData.Ds2, datastore.NewKey("/retrievals/provider"))
as, err := askstore.NewAskStore(namespace.Wrap(providerDs, datastore.NewKey("retrieval-ask")), datastore.NewKey("latest"))
require.NoError(t, err)

priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) {
ask := retrievalmarket.Ask{}
Expand All @@ -173,7 +176,7 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA
// Set up a DAG store
dagstoreWrapper := tut.NewMockDagStoreWrapper(pieceStore, sectorAccessor)
provider, err := retrievalimpl.NewProvider(
paymentAddress, providerNode, sectorAccessor, nw2, pieceStore, dagstoreWrapper, dt2, providerDs,
paymentAddress, providerNode, sectorAccessor, nw2, pieceStore, dagstoreWrapper, dt2, providerDs, as,
priceFunc)
require.NoError(t, err)

Expand Down Expand Up @@ -694,6 +697,8 @@ func setupProvider(
testutil.StartAndWaitForReady(ctx, t, dt2)
require.NoError(t, err)
providerDs := namespace.Wrap(testData.Ds2, datastore.NewKey("/retrievals/provider"))
as, err := askstore.NewAskStore(namespace.Wrap(providerDs, datastore.NewKey("retrieval-ask")), datastore.NewKey("latest"))
require.NoError(t, err)

opts := []retrievalimpl.RetrievalProviderOption{retrievalimpl.DealDeciderOpt(decider)}
if disableNewDeals {
Expand Down Expand Up @@ -721,7 +726,7 @@ func setupProvider(
_ = os.Remove(carFilePath)

provider, err := retrievalimpl.NewProvider(providerPaymentAddr, providerNode, sectorAccessor,
nw2, pieceStore, dagstoreWrapper, dt2, providerDs, priceFunc, opts...)
nw2, pieceStore, dagstoreWrapper, dt2, providerDs, as, priceFunc, opts...)
require.NoError(t, err)

return provider
Expand Down
10 changes: 2 additions & 8 deletions retrievalmarket/impl/provider.go
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand All @@ -22,7 +21,6 @@ import (

"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/askstore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/dtutils"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/providerstates"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/requestvalidation"
Expand Down Expand Up @@ -108,6 +106,7 @@ func NewProvider(minerAddress address.Address,
dagStore stores.DAGStoreWrapper,
dataTransfer datatransfer.Manager,
ds datastore.Batching,
askStore retrievalmarket.AskStore,
retrievalPricingFunc RetrievalPricingFunc,
opts ...RetrievalProviderOption,
) (retrievalmarket.RetrievalProvider, error) {
Expand All @@ -127,6 +126,7 @@ func NewProvider(minerAddress address.Address,
readySub: pubsub.New(shared.ReadyDispatcher),
retrievalPricingFunc: retrievalPricingFunc,
dagStore: dagStore,
askStore: askStore,
stores: stores.NewReadOnlyBlockstores(),
}

Expand All @@ -135,12 +135,6 @@ func NewProvider(minerAddress address.Address,
return nil, err
}

askStore, err := askstore.NewAskStore(namespace.Wrap(ds, datastore.NewKey("retrieval-ask")), datastore.NewKey("latest"))
if err != nil {
return nil, err
}
p.askStore = askStore

retrievalMigrations, err := migrations.ProviderMigrations.Build()
if err != nil {
return nil, err
Expand Down
26 changes: 19 additions & 7 deletions retrievalmarket/impl/provider_test.go
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/askstore"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -150,7 +151,9 @@ func TestDynamicPricing(t *testing.T) {
ds := dss.MutexWrap(datastore.NewMapDatastore())
dagStore := tut.NewMockDagStoreWrapper(pieceStore, sa)
dt := tut.NewTestDataTransfer()
c, err := retrievalimpl.NewProvider(expectedAddress, node, sa, net, pieceStore, dagStore, dt, ds, pFnc)
askStore, err := askstore.NewAskStore(namespace.Wrap(ds, datastore.NewKey("retrieval-ask")), datastore.NewKey("latest"))
require.NoError(t, err)
c, err := retrievalimpl.NewProvider(expectedAddress, node, sa, net, pieceStore, dagStore, dt, ds, askStore, pFnc)
require.NoError(t, err)
tut.StartAndWaitForReady(ctx, t, c)
return c
Expand Down Expand Up @@ -701,6 +704,8 @@ func TestHandleQueryStream(t *testing.T) {

receiveStreamOnProvider := func(t *testing.T, node *testnodes.TestRetrievalProviderNode, sa *testnodes.TestSectorAccessor, qs network.RetrievalQueryStream, pieceStore piecestore.PieceStore) {
ds := dss.MutexWrap(datastore.NewMapDatastore())
as, err := askstore.NewAskStore(namespace.Wrap(ds, datastore.NewKey("retrieval-ask")), datastore.NewKey("latest"))
require.NoError(t, err)
dagStore := tut.NewMockDagStoreWrapper(pieceStore, sa)
dt := tut.NewTestDataTransfer()
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{})
Expand All @@ -719,7 +724,7 @@ func TestHandleQueryStream(t *testing.T) {
return ask, nil
}

c, err := retrievalimpl.NewProvider(expectedAddress, node, sa, net, pieceStore, dagStore, dt, ds, priceFunc)
c, err := retrievalimpl.NewProvider(expectedAddress, node, sa, net, pieceStore, dagStore, dt, ds, as, priceFunc)
require.NoError(t, err)

tut.StartAndWaitForReady(ctx, t, c)
Expand Down Expand Up @@ -930,6 +935,8 @@ func TestHandleQueryStream(t *testing.T) {

func TestProvider_Construct(t *testing.T) {
ds := datastore.NewMapDatastore()
as, err := askstore.NewAskStore(namespace.Wrap(ds, datastore.NewKey("retrieval-ask")), datastore.NewKey("latest"))
require.NoError(t, err)
pieceStore := tut.NewTestPieceStore()
node := testnodes.NewTestRetrievalProviderNode()
sa := testnodes.NewTestSectorAccessor()
Expand All @@ -941,7 +948,7 @@ func TestProvider_Construct(t *testing.T) {
return ask, nil
}

_, err := retrievalimpl.NewProvider(
_, err = retrievalimpl.NewProvider(
spect.NewIDAddr(t, 2344),
node,
sa,
Expand All @@ -950,6 +957,7 @@ func TestProvider_Construct(t *testing.T) {
dagStore,
dt,
ds,
as,
priceFunc,
)
require.NoError(t, err)
Expand Down Expand Up @@ -987,6 +995,8 @@ func TestProviderConfigOpts(t *testing.T) {
opt1 := func(p *retrievalimpl.Provider) { sawOpt++ }
opt2 := func(p *retrievalimpl.Provider) { sawOpt += 2 }
ds := datastore.NewMapDatastore()
as, err := askstore.NewAskStore(namespace.Wrap(ds, datastore.NewKey("retrieval-ask")), datastore.NewKey("latest"))
require.NoError(t, err)
pieceStore := tut.NewTestPieceStore()
node := testnodes.NewTestRetrievalProviderNode()
sa := testnodes.NewTestSectorAccessor()
Expand All @@ -1005,7 +1015,7 @@ func TestProviderConfigOpts(t *testing.T) {
pieceStore,
dagStore,
tut.NewTestDataTransfer(),
ds, priceFunc, opt1, opt2,
ds, as, priceFunc, opt1, opt2,
)
require.NoError(t, err)
assert.NotNil(t, p)
Expand All @@ -1026,7 +1036,7 @@ func TestProviderConfigOpts(t *testing.T) {
tut.NewTestPieceStore(),
dagStore,
tut.NewTestDataTransfer(),
ds, priceFunc, ddOpt)
ds, as, priceFunc, ddOpt)
require.NoError(t, err)
require.NotNil(t, p)
}
Expand Down Expand Up @@ -1073,7 +1083,8 @@ func TestProviderMigrations(t *testing.T) {
dt := tut.NewTestDataTransfer()

providerDs := namespace.Wrap(ds, datastore.NewKey("/retrievals/provider"))

as, err := askstore.NewAskStore(namespace.Wrap(providerDs, datastore.NewKey("retrieval-ask")), datastore.NewKey("latest"))
require.NoError(t, err)
numDeals := 5
payloadCIDs := make([]cid.Cid, numDeals)
iDs := make([]retrievalmarket.DealID, numDeals)
Expand All @@ -1095,7 +1106,7 @@ func TestProviderMigrations(t *testing.T) {
offsets := make([]abi.PaddedPieceSize, numDeals)
lengths := make([]abi.PaddedPieceSize, numDeals)
allSelectorBuf := new(bytes.Buffer)
err := dagcbor.Encode(selectorparse.CommonSelector_ExploreAllRecursively, allSelectorBuf)
err = dagcbor.Encode(selectorparse.CommonSelector_ExploreAllRecursively, allSelectorBuf)
require.NoError(t, err)
allSelectorBytes := allSelectorBuf.Bytes()

Expand Down Expand Up @@ -1190,6 +1201,7 @@ func TestProviderMigrations(t *testing.T) {
dagStore,
dt,
providerDs,
as,
priceFunc,
)
require.NoError(t, err)
Expand Down